LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 687 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 36 0.0 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #define _GNU_SOURCE /* SOL_TCP */
       4             : #include "fd_bundle_auth.h"
       5             : #include "fd_bundle_tile_private.h"
       6             : #include "fd_bundle_tile.h"
       7             : #include "proto/block_engine.pb.h"
       8             : #include "proto/bundle.pb.h"
       9             : #include "proto/packet.pb.h"
      10             : #include "../fd_txn_m.h"
      11             : #include "../../waltz/h2/fd_h2_conn.h"
      12             : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
      13             : #include "../../waltz/openssl/fd_openssl.h"
      14             : #include "../../ballet/base58/fd_base58.h"
      15             : #include "../../ballet/nanopb/pb_decode.h"
      16             : #include "../../util/net/fd_ip4.h"
      17             : 
      18             : #include <fcntl.h>
      19             : #include <errno.h>
      20             : #include <unistd.h> /* close */
      21             : #include <poll.h> /* poll */
      22             : #include <sys/socket.h> /* socket */
      23             : #include <netinet/in.h>
      24             : #include <netinet/ip.h>
      25             : #include <netinet/tcp.h>
      26             : 
      27           0 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      28             : 
      29             : 
      30             : __attribute__((weak)) long
      31           0 : fd_bundle_now( void ) {
      32           0 :   return fd_log_wallclock();
      33           0 : }
      34             : 
      35             : void
      36           0 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      37           0 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      38           0 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      39           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      40           0 :     }
      41           0 :     ctx->tcp_sock = -1;
      42           0 :     ctx->tcp_sock_connected = 0;
      43           0 :   }
      44           0 :   ctx->defer_reset = 0;
      45             : 
      46           0 :   ctx->builder_info_avail       = 0;
      47           0 :   ctx->builder_info_wait        = 0;
      48           0 :   ctx->packet_subscription_live = 0;
      49           0 :   ctx->packet_subscription_wait = 0;
      50           0 :   ctx->bundle_subscription_live = 0;
      51           0 :   ctx->bundle_subscription_wait = 0;
      52             : 
      53           0 :   fd_memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
      54             : 
      55           0 : # if FD_HAS_OPENSSL
      56           0 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      57           0 :     SSL_free( ctx->ssl );
      58           0 :     ctx->ssl = NULL;
      59           0 :   }
      60           0 : # endif
      61             : 
      62           0 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
      63             : 
      64           0 :   fd_bundle_auther_reset( &ctx->auther );
      65           0 :   fd_grpc_client_reset( ctx->grpc_client );
      66           0 : }
      67             : 
      68             : static int
      69             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      70           0 :                              uint                     ip4_addr ) {
      71           0 :   struct sockaddr_in addr = {
      72           0 :     .sin_family      = AF_INET,
      73           0 :     .sin_addr.s_addr = ip4_addr,
      74           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      75           0 :   };
      76           0 :   int err = connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      77             :   /* FD_LIKELY is used here as EINPROGRESS is expected even to local tcp ports */
      78           0 :   if( FD_LIKELY( err==-1 ) ) {
      79           0 :     return errno;
      80           0 :   }
      81           0 :   return 0;
      82           0 : }
      83             : 
      84             : static int
      85           0 : fd_bundle_client_get_connect_result( fd_bundle_tile_t const * ctx ) {
      86           0 :   int so_err = 0;
      87           0 :   socklen_t so_err_sz = sizeof(so_err);
      88           0 :   if( FD_UNLIKELY( getsockopt( ctx->tcp_sock, SOL_SOCKET, SO_ERROR, &so_err, &so_err_sz )==-1 ) ) {
      89           0 :     return errno;
      90           0 :   }
      91           0 :   return so_err;
      92           0 : }
      93             : 
      94             : static void
      95           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      96           0 :   fd_bundle_client_reset( ctx );
      97             : 
      98             :   /* FIXME IPv6 support */
      99           0 :   fd_addrinfo_t hints = {0};
     100           0 :   hints.ai_family = AF_INET;
     101           0 :   fd_addrinfo_t * res = NULL;
     102           0 :   uchar scratch[ 4096 ];
     103           0 :   void * pscratch = scratch;
     104           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
     105           0 :   if( FD_UNLIKELY( err ) ) {
     106           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
     107           0 :     fd_bundle_client_reset( ctx );
     108           0 :     ctx->metrics.transport_fail_cnt++;
     109           0 :     return;
     110           0 :   }
     111           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
     112           0 :   ctx->server_ip4_addr = ip4_addr;
     113             : 
     114           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
     115           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
     116           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     117           0 :   }
     118           0 :   ctx->tcp_sock = tcp_sock;
     119             : 
     120           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
     121           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
     122           0 :   }
     123             : 
     124           0 :   int tcp_nodelay = 1;
     125           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     126           0 :     FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     127           0 :   }
     128             : 
     129           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     130           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     131           0 :   }
     132             : 
     133           0 :   char const * scheme = "http";
     134           0 : # if FD_HAS_OPENSSL
     135           0 :   if( ctx->is_ssl ) scheme = "https";
     136           0 : # endif
     137             : 
     138           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     139           0 :                 scheme,
     140           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     141           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     142             : 
     143           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     144             :   /* FD_LIKELY as EINPROGRESS is expected */
     145           0 :   if( FD_LIKELY( connect_err ) ) {
     146           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     147           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     148           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     149           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     150           0 :       fd_bundle_client_reset( ctx );
     151           0 :       ctx->metrics.transport_fail_cnt++;
     152           0 :       return;
     153           0 :     }
     154           0 :   }
     155             : 
     156           0 : # if FD_HAS_OPENSSL
     157           0 :   if( ctx->is_ssl ) {
     158           0 :     BIO * bio = fd_openssl_bio_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     159           0 :     if( FD_UNLIKELY( !bio ) ) {
     160           0 :       FD_LOG_ERR(( "fd_openssl_bio_new_socket failed" ));
     161           0 :     }
     162             : 
     163           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     164           0 :     if( FD_UNLIKELY( !ssl ) ) {
     165           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     166           0 :     }
     167             : 
     168           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     169           0 :     SSL_set_connect_state( ssl );
     170             : 
     171             :     /* Indicate to endpoint which server name we want */
     172           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     173           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     174           0 :     }
     175             : 
     176             :     /* Enable hostname verification */
     177           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     178           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     179           0 :     }
     180             : 
     181           0 :     ctx->ssl = ssl;
     182           0 :   }
     183           0 : # endif /* FD_HAS_OPENSSL */
     184             : 
     185           0 :   fd_grpc_client_reset( ctx->grpc_client );
     186           0 :   fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
     187           0 : }
     188             : 
     189             : static int
     190             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     191           0 :                            int *              charge_busy ) {
     192           0 : # if FD_HAS_OPENSSL
     193           0 :   if( ctx->is_ssl ) {
     194           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     195           0 :   }
     196           0 : # endif /* FD_HAS_OPENSSL */
     197             : 
     198           0 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     199           0 : }
     200             : 
     201             : static void
     202           0 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     203           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     204             : 
     205           0 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     206           0 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     207           0 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     208           0 :       ctx->grpc_client,
     209           0 :       path, sizeof(path)-1,
     210           0 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     211           0 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     212           0 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     213           0 :       0 /* is_streaming */
     214           0 :   );
     215           0 :   if( FD_UNLIKELY( !request ) ) return;
     216           0 :   fd_grpc_client_deadline_set(
     217           0 :       request,
     218           0 :       FD_GRPC_DEADLINE_RX_END,
     219           0 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     220             : 
     221           0 :   ctx->builder_info_wait = 1;
     222           0 : }
     223             : 
     224             : static void
     225           0 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     226           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     227             : 
     228           0 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     229           0 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     230           0 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     231           0 :       ctx->grpc_client,
     232           0 :       path, sizeof(path)-1,
     233           0 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     234           0 :       &block_engine_SubscribePacketsRequest_msg, &req,
     235           0 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     236           0 :       0 /* is_streaming */
     237           0 :   );
     238           0 :   if( FD_UNLIKELY( !request ) ) return;
     239           0 :   fd_grpc_client_deadline_set(
     240           0 :       request,
     241           0 :       FD_GRPC_DEADLINE_HEADER,
     242           0 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     243             : 
     244           0 :   ctx->packet_subscription_wait = 1;
     245           0 : }
     246             : 
     247             : static void
     248           0 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     249           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     250             : 
     251           0 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     252           0 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     253           0 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     254           0 :       ctx->grpc_client,
     255           0 :       path, sizeof(path)-1,
     256           0 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     257           0 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     258           0 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     259           0 :       0 /* is_streaming */
     260           0 :   );
     261           0 :   if( FD_UNLIKELY( !request ) ) return;
     262           0 :   fd_grpc_client_deadline_set(
     263           0 :       request,
     264           0 :       FD_GRPC_DEADLINE_HEADER,
     265           0 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     266             : 
     267           0 :   ctx->bundle_subscription_wait = 1;
     268           0 : }
     269             : 
     270             : void
     271           0 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     272           0 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     273           0 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     274           0 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     275           0 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     276           0 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     277             : 
     278           0 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     279           0 :     long now = fd_bundle_now();
     280           0 :     fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
     281           0 :     FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
     282           0 :   }
     283           0 : }
     284             : 
     285             : int
     286             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     287           0 :                                  long               now ) {
     288             :   /* Drive auth */
     289           0 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     290           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     291           0 :     return 1;
     292           0 :   }
     293           0 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     294             : 
     295             :   /* Request block builder info */
     296           0 :   int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
     297           0 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     298           0 :                      ( builder_info_expired     ) ) &
     299           0 :                    ( !ctx->builder_info_wait      ) ) ) {
     300           0 :     fd_bundle_client_request_builder_info( ctx );
     301           0 :     return 1;
     302           0 :   }
     303             : 
     304             :   /* Subscribe to packets */
     305           0 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     306           0 :     fd_bundle_client_subscribe_packets( ctx );
     307           0 :     return 1;
     308           0 :   }
     309             : 
     310             :   /* Subscribe to bundles */
     311           0 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     312           0 :     fd_bundle_client_subscribe_bundles( ctx );
     313           0 :     return 1;
     314           0 :   }
     315             : 
     316             :   /* Send a PING */
     317           0 :   if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
     318           0 :     fd_bundle_client_send_ping( ctx );
     319           0 :     return 1;
     320           0 :   }
     321             : 
     322           0 :   return 0;
     323           0 : }
     324             : 
     325             : static void
     326             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     327           0 :                         int *              charge_busy ) {
     328             : 
     329             :   /* Wait for TCP socket to connect */
     330           0 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     331           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     332             : 
     333           0 :     struct pollfd pfds[1] = {
     334           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     335           0 :     };
     336           0 :     int poll_res = fd_syscall_poll( pfds, 1, 0 );
     337           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     338           0 :       FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     339           0 :     }
     340           0 :     if( poll_res==0 ) return;
     341             : 
     342           0 :     int connect_result = 0;
     343           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     344           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     345           0 :     connect_failed:
     346           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_result, fd_io_strerror( connect_result ) ));
     347           0 :       fd_bundle_client_reset( ctx );
     348           0 :       ctx->metrics.transport_fail_cnt++;
     349           0 :       *charge_busy = 1;
     350           0 :       return;
     351           0 :     }
     352           0 :     if( pfds[0].revents & POLLOUT ) {
     353           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     354           0 :       if( FD_UNLIKELY( connect_result!=0 ) ) {
     355           0 :         goto connect_failed;
     356           0 :       }
     357           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     358           0 :       ctx->tcp_sock_connected = 1;
     359           0 :       *charge_busy = 1;
     360           0 :       return;
     361           0 :     }
     362           0 :     return;
     363           0 :   }
     364             : 
     365             :   /* gRPC conn died? */
     366           0 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     367           0 :     long sleep_start;
     368           0 :   reconnect:
     369           0 :     sleep_start = fd_bundle_now();
     370           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
     371           0 :       long wait_dur = ctx->backoff_until - sleep_start;
     372           0 :       fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
     373           0 :       return;
     374           0 :     }
     375           0 :     fd_bundle_client_create_conn( ctx );
     376           0 :     *charge_busy = 1;
     377           0 :     return;
     378           0 :   }
     379             : 
     380             :   /* Did a HTTP/2 PING time out */
     381           0 :   long check_ts = ctx->cached_ts = fd_bundle_now();
     382           0 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
     383           0 :     FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
     384           0 :                      (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
     385           0 :     ctx->keepalive->inflight = 0;
     386           0 :     ctx->defer_reset = 1;
     387           0 :     *charge_busy = 1;
     388           0 :     return;
     389           0 :   }
     390             : 
     391             :   /* Drive I/O, SSL handshake, and any inflight requests */
     392           0 :   if( FD_UNLIKELY( -1==fd_bundle_client_drive_io( ctx, charge_busy ) || ctx->defer_reset /* new error? */ ) ) {
     393           0 :     fd_bundle_client_reset( ctx );
     394           0 :     ctx->metrics.transport_fail_cnt++;
     395           0 :     *charge_busy = 1;
     396           0 :     return;
     397           0 :   }
     398             : 
     399             :   /* Are we ready to issue a new request? */
     400           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     401           0 :   long io_ts = fd_bundle_now();
     402           0 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     403             : 
     404           0 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     405           0 : }
     406             : 
     407             : static void
     408           0 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     409           0 :   int status = fd_bundle_client_status( ctx );
     410             : 
     411           0 :   int const connected_now    = ( status==FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED );
     412           0 :   int const connected_before = ( ctx->bundle_status_logged==FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED );
     413             : 
     414           0 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     415           0 :     long ts = fd_log_wallclock();
     416           0 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     417           0 :       if( connected_now ) {
     418           0 :         FD_LOG_NOTICE(( "Connected to bundle server" ));
     419           0 :       } else {
     420           0 :         FD_LOG_WARNING(( "Disconnected from bundle server" ));
     421           0 :       }
     422           0 :       ctx->last_bundle_status_log_nanos = ts;
     423           0 :       ctx->bundle_status_logged = (uchar)status;
     424           0 :     }
     425           0 :   }
     426           0 : }
     427             : 
     428             : void
     429             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     430           0 :                        int *              charge_busy ) {
     431             :   /* Edge-trigger logging with rate limiting */
     432           0 :   fd_bundle_client_step1( ctx, charge_busy );
     433           0 :   fd_bundle_client_log_status( ctx );
     434           0 : }
     435             : 
     436             : void
     437             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     438           0 :                         long               now ) {
     439           0 :   uint iter = ctx->backoff_iter;
     440           0 :   if( now < ctx->backoff_reset ) iter = 0U;
     441           0 :   iter++;
     442             : 
     443             :   /* FIXME proper backoff */
     444           0 :   long wait_ns = (long)2e9;
     445           0 :   wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
     446             : 
     447           0 :   ctx->backoff_until = now +   wait_ns;
     448           0 :   ctx->backoff_reset = now + 2*wait_ns;
     449             : 
     450           0 :   ctx->backoff_iter = iter;
     451           0 : }
     452             : 
     453             : static void
     454           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     455           0 :   (void)app_ctx;
     456           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     457           0 : }
     458             : 
     459             : static void
     460             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     461             :                                  uint   h2_err,
     462           0 :                                  int    closed_by ) {
     463           0 :   fd_bundle_tile_t * ctx = app_ctx;
     464           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     465           0 :                 closed_by ? "by peer" : "due to error",
     466           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     467           0 :   ctx->defer_reset = 1;
     468           0 : }
     469             : 
     470             : /* Buffers a bundle transaction for deferred publishing by after_credit. */
     471             : 
     472             : static void
     473             : fd_bundle_tile_publish_bundle_txn(
     474             :     fd_bundle_tile_t * ctx,
     475             :     void const *       txn,
     476             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     477             :     ulong              bundle_txn_cnt,
     478             :     uint               source_ipv4
     479           0 : ) {
     480           0 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     481           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     482           0 :     return;
     483           0 :   }
     484             : 
     485           0 :   if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
     486           0 :     ctx->metrics.backpressure_drop_cnt++;
     487           0 :     return;
     488           0 :   }
     489             : 
     490           0 :   fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
     491           0 :   fd_memcpy( entry->payload, txn, txn_sz );
     492           0 :   entry->payload_sz     = (ushort)txn_sz;
     493           0 :   entry->source_ipv4    = source_ipv4;
     494           0 :   entry->sig            = 1UL;
     495           0 :   entry->bundle_seq     = ctx->bundle_seq;
     496           0 :   entry->bundle_txn_cnt = bundle_txn_cnt;
     497           0 :   entry->commission     = (uchar)ctx->builder_commission;
     498           0 :   fd_memcpy( entry->commission_pubkey, ctx->builder_pubkey, 32UL );
     499           0 :   ctx->metrics.txn_received_cnt++;
     500           0 : }
     501             : 
     502             : /* Buffers a regular transaction for deferred publishing by after_credit. */
     503             : 
     504             : static void
     505             : fd_bundle_tile_publish_txn(
     506             :     fd_bundle_tile_t * ctx,
     507             :     void const *       txn,
     508             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     509             :     uint               source_ipv4
     510           0 : ) {
     511           0 :   if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
     512           0 :     ctx->metrics.backpressure_drop_cnt++;
     513           0 :     return;
     514           0 :   }
     515             : 
     516           0 :   fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
     517           0 :   fd_memcpy( entry->payload, txn, txn_sz );
     518           0 :   entry->payload_sz     = (ushort)txn_sz;
     519           0 :   entry->source_ipv4    = source_ipv4;
     520           0 :   entry->sig            = 0UL;
     521           0 :   entry->bundle_seq     = 0UL;
     522           0 :   entry->bundle_txn_cnt = 1UL;
     523           0 :   entry->commission     = 0U;
     524           0 :   fd_memset( entry->commission_pubkey, 0, 32UL );
     525           0 :   ctx->metrics.txn_received_cnt++;
     526           0 : }
     527             : 
     528             : /* Called for each transaction in a bundle.  Simply counts up
     529             :    bundle_txn_cnt, but does not publish anything. */
     530             : 
     531             : static bool
     532             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     533             :     pb_istream_t *     istream,
     534             :     pb_field_t const * field,
     535             :     void **            arg
     536           0 : ) {
     537           0 :   (void)istream; (void)field;
     538           0 :   fd_bundle_tile_t * ctx = *arg;
     539           0 :   ctx->bundle_txn_cnt++;
     540           0 :   return true;
     541           0 : }
     542             : 
     543             : /* Called for each transaction in a bundle.  Publishes each transaction
     544             :    to the tango message bus. */
     545             : 
     546             : static bool
     547             : fd_bundle_client_visit_pb_bundle_txn(
     548             :     pb_istream_t *     istream,
     549             :     pb_field_t const * field,
     550             :     void **            arg
     551           0 : ) {
     552           0 :   (void)field;
     553           0 :   fd_bundle_tile_t * ctx = *arg;
     554             : 
     555           0 :   packet_Packet packet = packet_Packet_init_default;
     556           0 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     557           0 :     ctx->metrics.decode_fail_cnt++;
     558           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     559           0 :     return false;
     560           0 :   }
     561             : 
     562           0 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     563           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     564           0 :     return true;
     565           0 :   }
     566             : 
     567           0 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     568           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     569           0 :     return true;
     570           0 :   }
     571             : 
     572           0 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
     573           0 :   fd_bundle_tile_publish_bundle_txn(
     574           0 :       ctx,
     575           0 :       packet.data.bytes, packet.data.size,
     576           0 :       ctx->bundle_txn_cnt,
     577           0 :       ip4
     578           0 :   );
     579             : 
     580           0 :   return true;
     581           0 : }
     582             : 
     583             : static void
     584             : fd_bundle_client_sample_rx_delay(
     585             :     fd_bundle_tile_t *                ctx,
     586             :     google_protobuf_Timestamp const * ts
     587           0 : ) {
     588           0 :   ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
     589           0 :   fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
     590           0 : }
     591             : 
     592             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     593             : 
     594             : static bool
     595             : fd_bundle_client_visit_pb_bundle_uuid(
     596             :     pb_istream_t *     istream,
     597             :     pb_field_t const * field,
     598             :     void **            arg
     599           0 : ) {
     600           0 :   (void)field;
     601           0 :   fd_bundle_tile_t * ctx = *arg;
     602             : 
     603             :   /* Reset bundle state */
     604             : 
     605           0 :   ctx->bundle_txn_cnt = 0UL;
     606             : 
     607             :   /* Do two decode passes.  This is required because we need to know the
     608             :      number of transactions in a bundle ahead of time.  However, due to
     609             :      the Protobuf wire encoding, we don't know the number of txns that
     610             :      will come until we've parsed everything.
     611             : 
     612             :      First pass: Count number of bundles. */
     613             : 
     614           0 :   pb_istream_t peek = *istream;
     615           0 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     616           0 :   bundle.bundle.packets = (pb_callback_t) {
     617           0 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     618           0 :     .arg          = ctx
     619           0 :   };
     620           0 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     621           0 :     ctx->metrics.decode_fail_cnt++;
     622           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     623           0 :     return false;
     624           0 :   }
     625             : 
     626             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.  Too many txns
     627             :      is treated as a NOP.
     628             : 
     629             :      Second pass: Actually publish bundle packets */
     630             : 
     631           0 :   if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
     632             : 
     633           0 :   if( FD_UNLIKELY( pending_txn_avail( ctx->pending_txns )<ctx->bundle_txn_cnt ) ) {
     634           0 :     ctx->metrics.backpressure_drop_cnt += ctx->bundle_txn_cnt;
     635           0 :     return true;
     636           0 :   }
     637             : 
     638           0 :   ctx->bundle_seq++;
     639           0 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     640           0 :   bundle.bundle.packets = (pb_callback_t) {
     641           0 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     642           0 :     .arg          = ctx
     643           0 :   };
     644             : 
     645           0 :   ctx->metrics.bundle_received_cnt++;
     646             : 
     647           0 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     648           0 :     ctx->metrics.decode_fail_cnt++;
     649           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     650           0 :     return false;
     651           0 :   }
     652             : 
     653           0 :   fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
     654             : 
     655           0 :   return true;
     656           0 : }
     657             : 
     658             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     659             : 
     660             : static void
     661             : fd_bundle_client_handle_bundle_batch(
     662             :     fd_bundle_tile_t * ctx,
     663             :     pb_istream_t *     istream
     664           0 : ) {
     665           0 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     666           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     667           0 :     return;
     668           0 :   }
     669             : 
     670           0 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     671           0 :   res.bundles = (pb_callback_t) {
     672           0 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     673           0 :     .arg          = ctx
     674           0 :   };
     675           0 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     676           0 :     ctx->metrics.decode_fail_cnt++;
     677           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     678           0 :     return;
     679           0 :   }
     680           0 : }
     681             : 
     682             : /* Called for each 'Packet' (a regular transaction) of a
     683             :    SubscribePacketsResponse. */
     684             : 
     685             : static bool
     686             : fd_bundle_client_visit_pb_packet(
     687             :     pb_istream_t *     istream,
     688             :     pb_field_t const * field,
     689             :     void **            arg
     690           0 : ) {
     691           0 :   (void)field;
     692           0 :   fd_bundle_tile_t * ctx = *arg;
     693             : 
     694           0 :   packet_Packet packet = packet_Packet_init_default;
     695           0 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     696           0 :     ctx->metrics.decode_fail_cnt++;
     697           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     698           0 :     return false;
     699           0 :   }
     700             : 
     701           0 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     702           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     703           0 :     return true;
     704           0 :   }
     705             : 
     706           0 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     707           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     708           0 :     return true;
     709           0 :   }
     710             : 
     711             : 
     712           0 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
     713           0 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
     714           0 :   ctx->metrics.packet_received_cnt++;
     715             : 
     716           0 :   return true;
     717           0 : }
     718             : 
     719             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     720             : 
     721             : static void
     722             : fd_bundle_client_handle_packet_batch(
     723             :     fd_bundle_tile_t * ctx,
     724             :     pb_istream_t *     istream
     725           0 : ) {
     726           0 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     727           0 :   res.batch.packets = (pb_callback_t) {
     728           0 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     729           0 :     .arg          = ctx
     730           0 :   };
     731           0 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     732           0 :     ctx->metrics.decode_fail_cnt++;
     733           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     734           0 :     return;
     735           0 :   }
     736             : 
     737           0 :   fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
     738           0 : }
     739             : 
     740             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     741             :    gRPC call. */
     742             : 
     743             : static void
     744             : fd_bundle_client_handle_builder_fee_info(
     745             :     fd_bundle_tile_t * ctx,
     746             :     pb_istream_t *     istream
     747           0 : ) {
     748           0 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     749           0 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     750           0 :     ctx->metrics.decode_fail_cnt++;
     751           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     752           0 :     return;
     753           0 :   }
     754           0 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     755           0 :     ctx->metrics.decode_fail_cnt++;
     756           0 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     757           0 :     return;
     758           0 :   }
     759             : 
     760           0 :   uchar decoded_builder_pubkey[ 32 ];
     761           0 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, decoded_builder_pubkey ) ) ) {
     762           0 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     763           0 :     return;
     764           0 :   }
     765             : 
     766           0 :   ctx->builder_commission = (uchar)res.commission; /* Apply update atomically */
     767           0 :   fd_memcpy( ctx->builder_pubkey, decoded_builder_pubkey, sizeof(ctx->builder_pubkey) );
     768             : 
     769           0 :   long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
     770           0 :   ctx->builder_info_avail = 1;
     771           0 :   ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
     772           0 : }
     773             : 
     774             : static void
     775             : fd_bundle_client_grpc_tx_complete(
     776             :     void * app_ctx,
     777             :     ulong  request_ctx
     778           0 : ) {
     779           0 :   (void)app_ctx; (void)request_ctx;
     780           0 : }
     781             : 
     782             : void
     783             : fd_bundle_client_grpc_rx_start(
     784             :     void * app_ctx,
     785             :     ulong  request_ctx
     786           0 : ) {
     787           0 :   fd_bundle_tile_t * ctx = app_ctx;
     788           0 :   switch( request_ctx ) {
     789           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     790           0 :     ctx->packet_subscription_live = 1;
     791           0 :     ctx->packet_subscription_wait = 0;
     792           0 :     break;
     793           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     794           0 :     ctx->bundle_subscription_live = 1;
     795           0 :     ctx->bundle_subscription_wait = 0;
     796           0 :     break;
     797           0 :   }
     798           0 : }
     799             : 
     800             : void
     801             : fd_bundle_client_grpc_rx_msg(
     802             :     void *       app_ctx,
     803             :     void const * protobuf,
     804             :     ulong        protobuf_sz,
     805             :     ulong        request_ctx
     806           0 : ) {
     807           0 :   fd_bundle_tile_t * ctx = app_ctx;
     808           0 :   ctx->metrics.proto_received_bytes += protobuf_sz;
     809           0 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     810           0 :   switch( request_ctx ) {
     811           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     812           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     813           0 :       ctx->metrics.decode_fail_cnt++;
     814           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     815           0 :     }
     816           0 :     break;
     817           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     818           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     819           0 :       ctx->metrics.decode_fail_cnt++;
     820           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     821           0 :     }
     822           0 :     break;
     823           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     824           0 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     825           0 :     break;
     826           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     827           0 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     828           0 :     break;
     829           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     830           0 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     831           0 :     break;
     832           0 :   default:
     833           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     834           0 :   }
     835           0 : }
     836             : 
     837             : static void
     838             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     839           0 :                                  ulong              request_ctx ) {
     840           0 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     841           0 :   switch( request_ctx ) {
     842           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     843           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     844           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     845           0 :     break;
     846           0 :   }
     847           0 : }
     848             : 
     849             : void
     850             : fd_bundle_client_grpc_rx_end(
     851             :     void *                app_ctx,
     852             :     ulong                 request_ctx,
     853             :     fd_grpc_resp_hdrs_t * resp
     854           0 : ) {
     855           0 :   fd_bundle_tile_t * ctx = app_ctx;
     856           0 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     857           0 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     858           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     859           0 :     return;
     860           0 :   }
     861             : 
     862           0 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     863           0 :   if( !resp->grpc_msg_len ) {
     864           0 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     865           0 :     resp->grpc_msg_len = 13;
     866           0 :   }
     867             : 
     868           0 :   switch( request_ctx ) {
     869           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     870           0 :     ctx->packet_subscription_live = 0;
     871           0 :     ctx->packet_subscription_wait = 0;
     872           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     873           0 :     ctx->defer_reset = 1;
     874           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     875           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     876           0 :     return;
     877           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     878           0 :     ctx->bundle_subscription_live = 0;
     879           0 :     ctx->bundle_subscription_wait = 0;
     880           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     881           0 :     ctx->defer_reset = 1;
     882           0 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     883           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     884           0 :     return;
     885           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     886           0 :     ctx->builder_info_wait = 0;
     887           0 :     break;
     888           0 :   default:
     889           0 :     break;
     890           0 :   }
     891             : 
     892           0 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     893           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     894           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     895           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     896           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     897           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     898           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     899           0 :       fd_bundle_auther_reset( &ctx->auther );
     900           0 :     }
     901           0 :     return;
     902           0 :   }
     903           0 : }
     904             : 
     905             : void
     906             : fd_bundle_client_grpc_rx_timeout(
     907             :     void * app_ctx,
     908             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     909             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     910           0 : ) {
     911           0 :   (void)deadline_kind;
     912           0 :   FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     913           0 :   fd_bundle_tile_t * ctx = app_ctx;
     914           0 :   ctx->defer_reset = 1;
     915           0 : }
     916             : 
     917             : static void
     918           0 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     919           0 :   fd_bundle_tile_t * ctx = app_ctx;
     920           0 :   long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
     921           0 :   if( FD_LIKELY( rtt_sample ) ) {
     922           0 :     fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     923           0 :     FD_LOG_DEBUG(( "Keepalive ACK" ));
     924           0 :   }
     925           0 :   ctx->metrics.ping_ack_cnt++;
     926           0 : }
     927             : 
     928             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     929             :   .conn_established = fd_bundle_client_grpc_conn_established,
     930             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     931             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     932             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     933             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     934             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     935             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     936             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     937             : };
     938             : 
     939             : int
     940           0 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     941           0 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     942           0 :                    ( !ctx->grpc_client        ) ) ) {
     943           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED;
     944           0 :   }
     945             : 
     946           0 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     947           0 :   if( FD_UNLIKELY( !conn ) ) {
     948           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED; /* no conn */
     949           0 :   }
     950           0 :   if( FD_UNLIKELY( conn->flags &
     951           0 :       ( FD_H2_CONN_FLAGS_DEAD |
     952           0 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     953           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED;
     954           0 :   }
     955             : 
     956           0 :   if( FD_UNLIKELY( conn->flags &
     957           0 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     958           0 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     959           0 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     960           0 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     961           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* connection is not ready */
     962           0 :   }
     963             : 
     964           0 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     965           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* not authenticated */
     966           0 :   }
     967             : 
     968           0 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     969           0 :                    ( !ctx->packet_subscription_live ) |
     970           0 :                    ( !ctx->bundle_subscription_live ) ) ) {
     971           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* not fully connected */
     972           0 :   }
     973             : 
     974           0 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
     975           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED; /* possible timeout */
     976           0 :   }
     977             : 
     978           0 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
     979           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING;
     980           0 :   }
     981             : 
     982             :   /* As far as we know, the bundle connection is alive and well. */
     983           0 :   return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED;
     984           0 : }
     985             : 
     986             : #undef DISCONNECTED
     987             : #undef CONNECTING
     988             : #undef CONNECTED
     989             : 
     990             : FD_FN_CONST char const *
     991           0 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
     992           0 :   switch( request_ctx ) {
     993           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     994           0 :     return "GenerateAuthChallenge";
     995           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     996           0 :     return "GenerateAuthTokens";
     997           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     998           0 :     return "SubscribePackets";
     999           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
    1000           0 :     return "SubscribeBundles";
    1001           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
    1002           0 :     return "GetBlockBuilderFeeInfo";
    1003           0 :   default:
    1004           0 :     return "unknown";
    1005           0 :   }
    1006           0 : }

Generated by: LCOV version 1.14