LCOV - code coverage report
Current view: top level - flamenco/gossip - fd_gossip.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 556 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 35 0.0 %

          Line data    Source code
       1             : #include "fd_gossip.h"
       2             : #include "fd_bloom.h"
       3             : #include "fd_gossip_message.h"
       4             : #include "fd_gossip_txbuild.h"
       5             : #include "fd_active_set.h"
       6             : #include "fd_ping_tracker.h"
       7             : #include "fd_prune_finder.h"
       8             : #include "fd_gossip_wsample.h"
       9             : #include "../../disco/keyguard/fd_keyguard.h"
      10             : #include "../../ballet/sha256/fd_sha256.h"
      11             : #include "../leaders/fd_leaders_base.h"
      12             : 
      13             : FD_STATIC_ASSERT( FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT==FD_GOSSIP_MESSAGE_CNT,
      14             :                   "FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT must match FD_GOSSIP_MESSAGE_CNT" );
      15             : 
      16             : FD_STATIC_ASSERT( FD_METRICS_ENUM_CRDS_VALUE_CNT==FD_GOSSIP_VALUE_CNT,
      17             :                   "FD_METRICS_ENUM_CRDS_VALUE_CNT must match FD_GOSSIP_VALUE_CNT" );
      18             : 
      19           0 : #define BLOOM_FALSE_POSITIVE_RATE (0.1)
      20           0 : #define BLOOM_NUM_KEYS            (8.0)
      21             : 
      22             : struct stake {
      23             :   fd_pubkey_t pubkey;
      24             :   ulong       stake;
      25             : 
      26             :   struct {
      27             :     ulong prev;
      28             :     ulong next;
      29             :   } map;
      30             : 
      31             :   struct {
      32             :     ulong next;
      33             :   } pool;
      34             : };
      35             : 
      36             : typedef struct stake stake_t;
      37             : 
      38             : /* NOTE: Since the staked count is known at the time we populate
      39             :    the map, we can treat the pool as an array instead. This means we
      40             :    can bypass the acquire/release model and quickly iterate through the
      41             :    pool when we repopulate the map on every fd_gossip_stakes_update
      42             :    iteration. */
      43             : #define POOL_NAME  stake_pool
      44           0 : #define POOL_T     stake_t
      45             : #define POOL_IDX_T ulong
      46           0 : #define POOL_NEXT  pool.next
      47             : #include "../../util/tmpl/fd_pool.c"
      48             : 
      49             : #define MAP_NAME               stake_map
      50           0 : #define MAP_KEY                pubkey
      51             : #define MAP_ELE_T              stake_t
      52             : #define MAP_KEY_T              fd_pubkey_t
      53           0 : #define MAP_PREV               map.prev
      54           0 : #define MAP_NEXT               map.next
      55           0 : #define MAP_KEY_EQ(k0,k1)      fd_pubkey_eq( k0, k1 )
      56           0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
      57             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      58             : #include "../../util/tmpl/fd_map_chain.c"
      59             : 
      60             : struct fd_gossip_private {
      61             :   uchar               identity_pubkey[ 32UL ];
      62             :   ulong               identity_stake;
      63             : 
      64             :   fd_gossip_metrics_t metrics[1];
      65             : 
      66             :   fd_gossip_wsample_t * wsample;
      67             :   fd_crds_t *           crds;
      68             :   fd_gossip_purged_t *  purged;
      69             :   fd_active_set_t *     active_set;
      70             :   fd_ping_tracker_t *   ping_tracker;
      71             :   fd_prune_finder_t *   prune_finder;
      72             : 
      73             :   fd_sha256_t sha256[1];
      74             :   fd_sha512_t sha512[1];
      75             : 
      76             :   ulong         entrypoints_cnt;
      77             :   fd_ip4_port_t entrypoints[ 16UL ];
      78             : 
      79             :   fd_rng_t * rng;
      80             : 
      81             :   struct {
      82             :     ulong         count;
      83             :     stake_t *     pool;
      84             :     stake_map_t * map;
      85             :   } stake;
      86             : 
      87             :   struct {
      88             :     long next_pull_request;
      89             :     long next_active_set_refresh;
      90             :     long next_contact_info_refresh;
      91             :     long next_flush_push_state;
      92             :   } timers;
      93             : 
      94             :   /* Token-bucket rate limiter for outbound pull response data.
      95             :      Matches Agave's DataBudget: replenished every 100ms with
      96             :      num_staked*1024 bytes, capped at 5x that amount.  Only
      97             :      pull responses are rate-limited; push messages are not. */
      98             :   struct {
      99             :     ulong remaining;           /* bytes remaining in budget (signed) */
     100             :     long last_replenish_nanos; /* last replenish timestamp in nanos  */
     101             :   } outbound_budget;
     102             : 
     103             :   /* Callbacks */
     104             :   fd_gossip_sign_fn   sign_fn;
     105             :   void *              sign_ctx;
     106             : 
     107             :   fd_gossip_send_fn   send_fn;
     108             :   void *              send_ctx;
     109             : 
     110             :   fd_ping_tracker_change_fn ping_tracker_change_fn;
     111             :   void *                    ping_tracker_change_fn_ctx;
     112             : 
     113             :   struct {
     114             :     uchar             crds_val[ FD_GOSSIP_VALUE_MAX_SZ ];
     115             :     ulong             crds_val_sz;
     116             :     fd_gossip_value_t ci[1];
     117             :   } my_contact_info;
     118             : 
     119             :   fd_gossip_out_ctx_t * gossip_net_out;
     120             : };
     121             : 
     122             : FD_FN_CONST ulong
     123           0 : fd_gossip_align( void ) {
     124           0 :   return 128uL;
     125           0 : }
     126             : 
     127             : FD_FN_CONST ulong
     128             : fd_gossip_footprint( ulong max_values,
     129           0 :                      ulong entrypoints_len ) {
     130           0 :   ulong l;
     131           0 :   l = FD_LAYOUT_INIT;
     132           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t),     sizeof(fd_gossip_t)                                                  );
     133           0 :   l = FD_LAYOUT_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values )                             );
     134           0 :   l = FD_LAYOUT_APPEND( l, fd_gossip_wsample_align(),fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE )            );
     135           0 :   l = FD_LAYOUT_APPEND( l, fd_crds_align(),          fd_crds_footprint( max_values )                                      );
     136           0 :   l = FD_LAYOUT_APPEND( l, fd_active_set_align(),    fd_active_set_footprint()                                            );
     137           0 :   l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(),  fd_ping_tracker_footprint( entrypoints_len )                         );
     138           0 :   l = FD_LAYOUT_APPEND( l, fd_prune_finder_align(),  fd_prune_finder_footprint()                                          );
     139           0 :   l = FD_LAYOUT_APPEND( l, stake_pool_align(),       stake_pool_footprint( MAX_STAKED_LEADERS )                           );
     140           0 :   l = FD_LAYOUT_APPEND( l, stake_map_align(),        stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
     141           0 :   l = FD_LAYOUT_FINI( l, fd_gossip_align() );
     142           0 :   return l;
     143           0 : }
     144             : 
     145             : static void
     146             : ping_tracker_change( void *        _ctx,
     147             :                      uchar const * peer_pubkey,
     148             :                      fd_ip4_port_t peer_address,
     149             :                      long          now,
     150           0 :                      int           change_type ) {
     151           0 :   fd_gossip_t * ctx = (fd_gossip_t *)_ctx;
     152             : 
     153           0 :   if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return;
     154             : 
     155           0 :   if( FD_LIKELY( change_type==FD_PING_TRACKER_CHANGE_TYPE_ACTIVE ) ) {
     156           0 :     fd_gossip_purged_drain_no_contact_info( ctx->purged, peer_pubkey );
     157           0 :   }
     158             : 
     159           0 :   ulong ci_idx = fd_crds_ci_idx( ctx->crds, peer_pubkey );
     160           0 :   if( FD_UNLIKELY( ci_idx!=ULONG_MAX ) ) {
     161           0 :     switch( change_type ) {
     162           0 :       case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE:
     163           0 :         fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 1 );
     164           0 :         break;
     165           0 :       case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE:
     166           0 :       case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED:
     167           0 :         fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 0 );
     168           0 :         fd_active_set_remove_peer( ctx->active_set, ci_idx );
     169           0 :         break;
     170           0 :       default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return;
     171           0 :     }
     172           0 :   }
     173             : 
     174           0 :   ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type );
     175           0 : }
     176             : 
     177             : static inline void
     178             : refresh_contact_info( fd_gossip_t * gossip,
     179           0 :                       long          now ) {
     180           0 :   fd_memcpy( gossip->my_contact_info.ci->origin, gossip->identity_pubkey, 32UL );
     181           0 :   gossip->my_contact_info.ci->wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
     182           0 :   long sz = fd_gossip_value_serialize( gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, FD_GOSSIP_VALUE_MAX_SZ );
     183           0 :   FD_TEST( sz!=-1L );
     184           0 :   gossip->my_contact_info.crds_val_sz = (ulong)sz;
     185             : 
     186           0 :   gossip->sign_fn( gossip->sign_ctx,
     187           0 :                    gossip->my_contact_info.crds_val+64UL,
     188           0 :                    gossip->my_contact_info.crds_val_sz-64UL,
     189           0 :                    FD_KEYGUARD_SIGN_TYPE_ED25519,
     190           0 :                    gossip->my_contact_info.crds_val );
     191             : 
     192             :   /* We don't have stem_ctx here so we pre-empt in next
     193             :      fd_gossip_advance iteration instead. */
     194           0 :   gossip->timers.next_contact_info_refresh = now;
     195           0 : }
     196             : 
     197             : void *
     198             : fd_gossip_new( void *                           shmem,
     199             :                fd_rng_t *                       rng,
     200             :                ulong                            max_values,
     201             :                ulong                            entrypoints_len,
     202             :                fd_ip4_port_t const *            entrypoints,
     203             :                uchar const *                    identity_pubkey,
     204             :                fd_gossip_contact_info_t const * my_contact_info,
     205             :                long                             now,
     206             :                fd_gossip_send_fn                send_fn,
     207             :                void *                           send_ctx,
     208             :                fd_gossip_sign_fn                sign_fn,
     209             :                void *                           sign_ctx,
     210             :                fd_ping_tracker_change_fn        ping_tracker_change_fn,
     211             :                void *                           ping_tracker_change_fn_ctx,
     212             :                fd_gossip_activity_update_fn     activity_update_fn,
     213             :                void *                           activity_update_fn_ctx,
     214             :                fd_gossip_out_ctx_t *            gossip_update_out,
     215           0 :                fd_gossip_out_ctx_t *            gossip_net_out ) {
     216           0 :   if( FD_UNLIKELY( !shmem ) ) {
     217           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     218           0 :     return NULL;
     219           0 :   }
     220             : 
     221           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) {
     222           0 :     FD_LOG_WARNING(( "misaligned shmem" ));
     223           0 :     return NULL;
     224           0 :   }
     225             : 
     226           0 :   if( FD_UNLIKELY( entrypoints_len>16UL ) ) {
     227           0 :     FD_LOG_WARNING(( "entrypoints_cnt must be in [0, 16]" ));
     228           0 :     return NULL;
     229           0 :   }
     230             : 
     231           0 :   if( FD_UNLIKELY( !fd_ulong_is_pow2( max_values ) ) ) {
     232           0 :     FD_LOG_WARNING(( "max_values must be a power of 2" ));
     233           0 :     return NULL;
     234           0 :   }
     235             : 
     236           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     237           0 :   fd_gossip_t * gossip  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t),      sizeof(fd_gossip_t)                                                  );
     238           0 :   void * purged         = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_purged_align(),  fd_gossip_purged_footprint( max_values )                             );
     239           0 :   void * wsample        = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_wsample_align(), fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE )            );
     240           0 :   void * crds           = FD_SCRATCH_ALLOC_APPEND( l, fd_crds_align(),           fd_crds_footprint( max_values )                                      );
     241           0 :   void * active_set     = FD_SCRATCH_ALLOC_APPEND( l, fd_active_set_align(),     fd_active_set_footprint()                                            );
     242           0 :   void * ping_tracker   = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(),   fd_ping_tracker_footprint( entrypoints_len )                         );
     243           0 :   void * prune_finder   = FD_SCRATCH_ALLOC_APPEND( l, fd_prune_finder_align(),   fd_prune_finder_footprint()                                          );
     244           0 :   void * stake_pool     = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(),        stake_pool_footprint( MAX_STAKED_LEADERS )                           );
     245           0 :   void * stake_weights  = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(),         stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
     246             : 
     247           0 :   gossip->gossip_net_out  = gossip_net_out;
     248             : 
     249           0 :   gossip->entrypoints_cnt = entrypoints_len;
     250           0 :   fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) );
     251             : 
     252           0 :   gossip->purged = fd_gossip_purged_join( fd_gossip_purged_new( purged, rng, max_values ) );
     253           0 :   FD_TEST( gossip->purged );
     254             : 
     255           0 :   gossip->wsample = fd_gossip_wsample_join( fd_gossip_wsample_new( wsample, rng, FD_CONTACT_INFO_TABLE_SIZE ) );
     256           0 :   FD_TEST( gossip->wsample );
     257             : 
     258           0 :   gossip->crds = fd_crds_join( fd_crds_new( crds, entrypoints, entrypoints_len, gossip->wsample, active_set, rng, max_values, gossip->purged, activity_update_fn, activity_update_fn_ctx, gossip_update_out ) );
     259           0 :   FD_TEST( gossip->crds );
     260             : 
     261           0 :   gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, gossip->wsample, gossip->crds, rng, identity_pubkey, 0UL, send_fn, send_ctx ) );
     262           0 :   FD_TEST( gossip->active_set );
     263             : 
     264           0 :   gossip->ping_tracker = fd_ping_tracker_join( fd_ping_tracker_new( ping_tracker, rng, gossip->entrypoints_cnt, gossip->entrypoints, ping_tracker_change, gossip ) );
     265           0 :   FD_TEST( gossip->ping_tracker );
     266             : 
     267           0 :   gossip->prune_finder = fd_prune_finder_join( fd_prune_finder_new( prune_finder ) );
     268           0 :   FD_TEST( gossip->prune_finder );
     269             : 
     270           0 :   gossip->stake.count = 0UL;
     271           0 :   gossip->stake.pool = stake_pool_join( stake_pool_new( stake_pool, MAX_STAKED_LEADERS ) );
     272           0 :   FD_TEST( gossip->stake.pool );
     273             : 
     274           0 :   gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt_est( MAX_STAKED_LEADERS ), fd_rng_ulong( rng ) ) );
     275           0 :   FD_TEST( gossip->stake.map );
     276             : 
     277           0 :   FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
     278           0 :   FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );
     279             : 
     280           0 :   gossip->rng = rng;
     281             : 
     282           0 :   gossip->timers.next_pull_request = 0L;
     283           0 :   gossip->timers.next_active_set_refresh = 0L;
     284           0 :   gossip->timers.next_contact_info_refresh = 0L;
     285           0 :   gossip->timers.next_flush_push_state = 0L;
     286             : 
     287           0 :   gossip->outbound_budget.remaining            = 0UL;
     288           0 :   gossip->outbound_budget.last_replenish_nanos = now;
     289             : 
     290           0 :   gossip->send_fn  = send_fn;
     291           0 :   gossip->send_ctx = send_ctx;
     292           0 :   gossip->sign_fn  = sign_fn;
     293           0 :   gossip->sign_ctx = sign_ctx;
     294           0 :   gossip->ping_tracker_change_fn     = ping_tracker_change_fn;
     295           0 :   gossip->ping_tracker_change_fn_ctx = ping_tracker_change_fn_ctx;
     296             : 
     297           0 :   gossip->my_contact_info.ci->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
     298           0 :   *gossip->my_contact_info.ci->contact_info = *my_contact_info;
     299           0 :   fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
     300           0 :   gossip->identity_stake = 0UL;
     301           0 :   refresh_contact_info( gossip, now );
     302             : 
     303           0 :   fd_memset( gossip->metrics, 0, sizeof(fd_gossip_metrics_t) );
     304             : 
     305           0 :   return gossip;
     306           0 : }
     307             : 
     308             : fd_gossip_t *
     309           0 : fd_gossip_join( void * shgossip ) {
     310           0 :   if( FD_UNLIKELY( !shgossip ) ) {
     311           0 :     FD_LOG_WARNING(( "NULL shgossip" ));
     312           0 :     return NULL;
     313           0 :   }
     314             : 
     315           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) {
     316           0 :     FD_LOG_WARNING(( "misaligned shgossip" ));
     317           0 :     return NULL;
     318           0 :   }
     319             : 
     320           0 :   return (fd_gossip_t *)shgossip;
     321           0 : }
     322             : 
     323             : fd_gossip_metrics_t const *
     324           0 : fd_gossip_metrics( fd_gossip_t const * gossip ) {
     325           0 :   return gossip->metrics;
     326           0 : }
     327             : 
     328             : fd_crds_metrics_t const *
     329           0 : fd_gossip_crds_metrics( fd_gossip_t const * gossip ) {
     330           0 :   return fd_crds_metrics( gossip->crds );
     331           0 : }
     332             : 
     333             : fd_ping_tracker_metrics_t const *
     334           0 : fd_gossip_ping_tracker_metrics( fd_gossip_t const * gossip ) {
     335           0 :   return fd_ping_tracker_metrics( gossip->ping_tracker );
     336           0 : }
     337             : 
     338             : fd_gossip_purged_metrics_t const *
     339           0 : fd_gossip_purged_metrics2( fd_gossip_t const * gossip ) {
     340           0 :   return fd_gossip_purged_metrics( gossip->purged );
     341           0 : }
     342             : 
     343             : fd_active_set_metrics_t const *
     344           0 : fd_gossip_active_set_metrics2( fd_gossip_t const * gossip ) {
     345           0 :   return fd_active_set_metrics( gossip->active_set );
     346           0 : }
     347             : 
     348             : static fd_ip4_port_t
     349           0 : random_entrypoint( fd_gossip_t const * gossip ) {
     350           0 :   ulong idx = fd_rng_ulong_roll( gossip->rng, gossip->entrypoints_cnt );
     351           0 :   return gossip->entrypoints[ idx ];
     352           0 : }
     353             : 
     354             : ulong
     355             : get_stake( fd_gossip_t const * gossip,
     356           0 :            uchar const *       pubkey ) {
     357           0 :   stake_t const * entry = stake_map_ele_query_const( gossip->stake.map, (fd_pubkey_t const *)pubkey, NULL, gossip->stake.pool );
     358           0 :   if( FD_UNLIKELY( !entry ) ) return 0UL;
     359           0 :   return entry->stake;
     360           0 : }
     361             : 
     362             : void
     363             : fd_gossip_set_identity( fd_gossip_t * gossip,
     364             :                         uchar const * identity_pubkey,
     365           0 :                         long          now ) {
     366           0 :   int identity_changed = memcmp( gossip->identity_pubkey, identity_pubkey, 32UL );
     367           0 :   if( FD_UNLIKELY( !identity_changed ) ) return;
     368             : 
     369           0 :   ulong new_ci_idx = fd_crds_ci_idx( gossip->crds, identity_pubkey );
     370             : 
     371             :   /* The new identity may already exist in CRDS as a normal peer (active
     372             :      in the wsample and potentially present in the active set).  We
     373             :      must deactivate it before updating identity_pubkey to maintain the
     374             :      invariant that our own identity is never sampleable. */
     375           0 :   if( FD_UNLIKELY( new_ci_idx!=ULONG_MAX ) ) fd_active_set_remove_peer( gossip->active_set, new_ci_idx );
     376             : 
     377           0 :   fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
     378           0 :   gossip->identity_stake = get_stake( gossip, identity_pubkey );
     379           0 :   fd_gossip_wsample_set_identity( gossip->wsample, new_ci_idx );
     380           0 :   fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
     381           0 :   fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
     382           0 :   fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
     383           0 :   refresh_contact_info( gossip, now );
     384           0 : }
     385             : 
     386             : void
     387             : fd_gossip_set_shred_version( fd_gossip_t * gossip,
     388             :                              ushort        shred_version,
     389           0 :                              long          now ) {
     390           0 :   gossip->my_contact_info.ci->contact_info->shred_version = shred_version;
     391           0 :   refresh_contact_info( gossip, now );
     392           0 : }
     393             : 
     394             : void
     395             : fd_gossip_stakes_update( fd_gossip_t *                  gossip,
     396             :                          fd_vote_stake_weight_t const * stake_weights,
     397           0 :                          ulong                          stake_weights_cnt ) {
     398           0 :   stake_map_reset( gossip->stake.map );
     399           0 :   stake_pool_reset( gossip->stake.pool );
     400             : 
     401           0 :   for( ulong i=0UL; i<stake_weights_cnt; i++ ) {
     402           0 :     if( FD_UNLIKELY( fd_pubkey_eq( &stake_weights[i].id_key, &FD_DUMMY_ACCOUNT_PUBKEY ) ) ) continue;
     403           0 :     stake_t * entry;
     404           0 :     if( FD_UNLIKELY( (entry = stake_map_ele_query( gossip->stake.map, &stake_weights[i].id_key, NULL, gossip->stake.pool )) ) ) {
     405           0 :       entry->stake += stake_weights[ i ].stake;
     406           0 :     } else {
     407           0 :       entry = stake_pool_ele_acquire( gossip->stake.pool );
     408           0 :       fd_memcpy( entry->pubkey.uc, stake_weights[ i ].id_key.uc, 32UL );
     409           0 :       entry->stake = stake_weights[ i ].stake;
     410           0 :       stake_map_ele_insert( gossip->stake.map, entry, gossip->stake.pool );
     411           0 :     }
     412           0 :   }
     413             : 
     414           0 :   gossip->identity_stake = get_stake( gossip, gossip->identity_pubkey );
     415           0 :   fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
     416           0 :   fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
     417           0 :   fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
     418           0 :   gossip->stake.count = stake_pool_used( gossip->stake.pool );
     419           0 : }
     420             : 
     421             : /* Outbound data budget constants (matching Agave's DataBudget for gossip).
     422             :    Budget is replenished every BUDGET_REPLENISH_INTERVAL_NS with
     423             :    num_staked * BUDGET_BYTES_PER_INTERVAL bytes, capped at
     424             :    BUDGET_MAX_MULTIPLE * num_staked * BUDGET_BYTES_PER_INTERVAL. */
     425             : 
     426             : #define BUDGET_REPLENISH_INTERVAL_NS (100L*1000L*1000L) /* 100 ms */
     427           0 : #define BUDGET_BYTES_PER_INTERVAL    (1024UL)           /* per staked validator */
     428           0 : #define BUDGET_MAX_MULTIPLE          (5UL)              /* max accumulation */
     429           0 : #define BUDGET_MIN_STAKED            (2UL)              /* floor for num_staked */
     430             : 
     431             : /* Lazily replenish the outbound pull-response budget if at least
     432             :    BUDGET_REPLENISH_INTERVAL_NS have elapsed since last replenish.
     433             :    Returns current remaining budget in bytes. */
     434             : 
     435             : static inline ulong
     436             : outbound_budget_replenish( fd_gossip_t * gossip,
     437           0 :                            long          now ) {
     438           0 :   long elapsed = now-gossip->outbound_budget.last_replenish_nanos;
     439             : 
     440           0 :   if( FD_LIKELY( elapsed>=BUDGET_REPLENISH_INTERVAL_NS ) ) {
     441           0 :     ulong num_staked = fd_ulong_max( gossip->stake.count, BUDGET_MIN_STAKED );
     442           0 :     ulong increment  = num_staked * BUDGET_BYTES_PER_INTERVAL;
     443           0 :     ulong cap        = BUDGET_MAX_MULTIPLE * increment;
     444           0 :     ulong remaining  = gossip->outbound_budget.remaining + increment;
     445           0 :     gossip->outbound_budget.remaining            = fd_ulong_min( remaining, cap );
     446           0 :     gossip->outbound_budget.last_replenish_nanos = now;
     447           0 :   }
     448           0 :   return gossip->outbound_budget.remaining;
     449           0 : }
     450             : 
     451             : static inline void
     452             : txbuild_flush( fd_gossip_t *         gossip,
     453             :                fd_gossip_txbuild_t * txbuild,
     454             :                fd_stem_context_t *   stem,
     455             :                fd_ip4_port_t         dest_addr,
     456           0 :                long                  now ) {
     457           0 :   if( FD_UNLIKELY( !txbuild->crds_len ) ) return;
     458             : 
     459             :   /* Debit the outbound data budget (gossip payload bytes only, not
     460             :      including IP/UDP headers — matching Agave's DataBudget which
     461             :      operates on serialized gossip-layer packet sizes). */
     462           0 :   gossip->outbound_budget.remaining -= fd_ulong_min( txbuild->bytes_len, gossip->outbound_budget.remaining );
     463             : 
     464           0 :   gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now );
     465             : 
     466           0 :   gossip->metrics->message_tx[ txbuild->tag ]++;
     467           0 :   gossip->metrics->message_tx_bytes[ txbuild->tag ] += txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     468           0 :   for( ulong i=0UL; i<txbuild->crds_len; i++ ) {
     469           0 :     gossip->metrics->crds_tx_pull_response[ txbuild->crds[ i ].tag ]++;
     470           0 :     gossip->metrics->crds_tx_pull_response_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
     471           0 :   }
     472             : 
     473           0 :   fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag );
     474           0 : }
     475             : 
     476             : static void
     477             : rx_pull_request( fd_gossip_t *                    gossip,
     478             :                  fd_gossip_pull_request_t const * pr_view,
     479             :                  fd_ip4_port_t                    peer_addr,
     480             :                  fd_stem_context_t *              stem,
     481           0 :                  long                             now ) {
     482             :   /* Replenish and check outbound data budget.  If the budget is
     483             :      exhausted, skip generating pull responses entirely. */
     484           0 :   if( FD_UNLIKELY( !outbound_budget_replenish( gossip, now ) ) ) return;
     485             : 
     486             :   /* When responding to a pull request, we skip CRDS entries whose
     487             :      wallclock is newer than the caller's wallclock + a random jitter.
     488             :      The jitter is drawn uniformly from [0, TIMEOUT/4) ms, matching
     489             :      Agave's behavior (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS = 15000ms).
     490             :      This prevents all responders from consistently excluding the same
     491             :      set of very-recent CRDS values. */
     492           0 : #define FD_GOSSIP_PULL_JITTER_BOUND_MS  (15000UL/4UL)
     493             : 
     494             :   /* Generate a random jitter in [0, 3750) ms, added to the caller's
     495             :      wallclock.  CRDS entries newer than this adjusted threshold are
     496             :      excluded from the response.  The jitter prevents all responders
     497             :      from consistently excluding the same near-boundary entries,
     498             :      improving cluster-wide convergence of recent values. */
     499           0 :   ulong caller_wallclock_ms    = pr_view->contact_info->wallclock;
     500           0 :   ulong jitter_ms              = fd_rng_ulong_roll( gossip->rng, FD_GOSSIP_PULL_JITTER_BOUND_MS );
     501           0 :   ulong adjusted_wallclock_ms  = caller_wallclock_ms + jitter_ms;
     502             : 
     503           0 :   ulong keys[ sizeof(pr_view->crds_filter->filter->keys)/sizeof(ulong) ];
     504           0 :   ulong bits[ sizeof(pr_view->crds_filter->filter->bits)/sizeof(ulong) ];
     505           0 :   fd_memcpy( keys, pr_view->crds_filter->filter->keys, sizeof(pr_view->crds_filter->filter->keys) );
     506           0 :   fd_memcpy( bits, pr_view->crds_filter->filter->bits, sizeof(pr_view->crds_filter->filter->bits) );
     507             : 
     508           0 :   fd_bloom_t filter[1];
     509           0 :   filter->keys_len = pr_view->crds_filter->filter->keys_len;
     510           0 :   filter->keys = keys;
     511             : 
     512           0 :   filter->bits_len = pr_view->crds_filter->filter->bits_len;
     513           0 :   filter->bits     = bits;
     514             : 
     515           0 :   fd_gossip_txbuild_t pull_resp[1];
     516           0 :   fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
     517             : 
     518           0 :   uchar iter_mem[ 16UL ];
     519             : 
     520           0 :   for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->crds_filter->mask, pr_view->crds_filter->mask_bits, iter_mem );
     521           0 :        !fd_crds_mask_iter_done( it, gossip->crds );
     522           0 :        it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
     523           0 :     fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );
     524             : 
     525             :     /* Skip CRDS entries whose originator wallclock is newer than the
     526             :        caller's wallclock + jitter.  The caller hasn't had time to
     527             :        observe these values yet, so including them would be wasteful. */
     528           0 :     if( FD_UNLIKELY( fd_crds_entry_wallclock( candidate )>adjusted_wallclock_ms ) ) continue;
     529             : 
     530           0 :     if( FD_UNLIKELY( fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;
     531             : 
     532           0 :     uchar const * crds_val;
     533           0 :     ulong         crds_size;
     534           0 :     fd_crds_entry_value( candidate, &crds_val, &crds_size );
     535           0 :     if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
     536           0 :     fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
     537           0 :     if( FD_UNLIKELY( !gossip->outbound_budget.remaining ) ) break;
     538           0 :   }
     539             : 
     540           0 :   txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
     541           0 : }
     542             : 
     543             : static void
     544             : rx_values( fd_gossip_t *             gossip,
     545             :            ulong                     values_len,
     546             :            fd_gossip_value_t const * values,
     547             :            uchar const *             payload,
     548             :            uchar const *             failed,
     549             :            fd_stem_context_t *       stem,
     550             :            long                      now,
     551           0 :            long                      results[ static 17UL ] ) {
     552           0 :   for( ulong i=0UL; i<values_len; i++ ) {
     553           0 :     fd_gossip_value_t const * value = &values[ i ];
     554             : 
     555           0 :     if( FD_UNLIKELY( failed[ i ] ) ) {
     556           0 :       uchar candidate_hash[ 32UL ];
     557           0 :       fd_sha256_hash( payload+value->offset, value->length, candidate_hash );
     558           0 :       if( FD_LIKELY( failed[ i ]==FD_GOSSIP_FAILED_NO_CONTACT_INFO ) ) fd_gossip_purged_insert_no_contact_info( gossip->purged, value->origin, candidate_hash, now );
     559           0 :       else                                                             fd_gossip_purged_insert_failed_insert( gossip->purged, candidate_hash, now );
     560           0 :       continue;
     561           0 :     }
     562             : 
     563           0 :     ulong origin_stake = get_stake( gossip, value->origin );
     564           0 :     int origin_ping_tracker_active = fd_ping_tracker_active( gossip->ping_tracker, value->origin );
     565           0 :     int is_me = !memcmp( value->origin, gossip->identity_pubkey, 32UL );
     566             : 
     567           0 :     results[ i ] = fd_crds_insert( gossip->crds, value, payload+value->offset, value->length, origin_stake, origin_ping_tracker_active, is_me, now, stem );
     568           0 :     if( FD_UNLIKELY( results[ i ] ) ) continue;
     569             : 
     570           0 :     if( FD_UNLIKELY( value->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
     571           0 :       fd_ip4_port_t origin_addr = {
     572           0 :         .addr = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
     573           0 :         .port = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
     574           0 :       };
     575           0 :       if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, value->origin, origin_stake, origin_addr, now );
     576             : 
     577             :       /* We just learned this peer's contact info.  Drain any
     578             :          no_contact_info hashes associated with this origin from the
     579             :          purged set so peers re-send those CRDS values. */
     580           0 :       if( FD_LIKELY( fd_ping_tracker_active( gossip->ping_tracker, value->origin ) ) ) fd_gossip_purged_drain_no_contact_info( gossip->purged, value->origin );
     581           0 :     }
     582             : 
     583           0 :     fd_active_set_push( gossip->active_set, payload+value->offset, value->length, value->origin, origin_stake, stem, now, 0 );
     584           0 :   }
     585           0 : }
     586             : 
     587             : static void
     588             : rx_pull_response( fd_gossip_t *                     gossip,
     589             :                   fd_gossip_pull_response_t const * pull_response,
     590             :                   uchar const *                     payload,
     591             :                   uchar const *                     failed,
     592             :                   fd_stem_context_t *               stem,
     593           0 :                   long                              now ) {
     594           0 :   long results[ 17UL ];
     595           0 :   rx_values( gossip, pull_response->values_len, pull_response->values, payload, failed, stem, now, results );
     596           0 :   for( ulong i=0UL; i<pull_response->values_len; i++ ) {
     597           0 :     if( FD_UNLIKELY( failed[ i ] ) ) continue;
     598           0 :     if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PULL_RESPONSE_IDX ]++;
     599           0 :     else if( results[ i ]<0L )       gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_STALE_IDX ]++;
     600           0 :     else                             gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_DUPLICATE_IDX ]++;
     601           0 :   }
     602           0 : }
     603             : 
     604             : /* tx_prune constructs, signs, and sends a prune message telling
     605             :    `relayer` to stop pushing CRDS values originating from `origin`.
     606             : 
     607             :    On-wire layout (bincode):
     608             :      Protocol tag        4  (FD_GOSSIP_MESSAGE_PRUNE = 3)
     609             :      sender pubkey      32  (= identity_pubkey, outer PruneMessage field)
     610             :      PruneData.pubkey   32  (= identity_pubkey)
     611             :      prunes_len          8
     612             :      prunes[1]          32
     613             :      signature          64
     614             :      destination        32
     615             :      wallclock           8
     616             : 
     617             :    The signable data (input to Ed25519 sign) is the PruneData fields
     618             :    excluding signature:
     619             :      prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8]
     620             :    This must match fd_keyguard_payload_matches_prune_data (106 + 32 bytes). */
     621             : 
     622             : static void
     623             : tx_prune( fd_gossip_t *       gossip,
     624             :           uchar const *       relayer,
     625             :           uchar const *       origin,
     626             :           fd_stem_context_t * stem,
     627           0 :           long                now ) {
     628           0 :   ulong ci_idx = fd_crds_ci_idx( gossip->crds, relayer );
     629           0 :   if( FD_UNLIKELY( ci_idx==ULONG_MAX ) ) return;
     630             : 
     631           0 :   fd_gossip_contact_info_t const * ci = fd_crds_ci( gossip->crds, ci_idx );
     632           0 :   fd_ip4_port_t dest_addr = {
     633           0 :     .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
     634           0 :     .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
     635           0 :   };
     636           0 :   if( FD_UNLIKELY( !dest_addr.addr || !dest_addr.port ) ) return;
     637             : 
     638           0 :   ulong wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
     639             : 
     640             :   /* Build the signable payload:
     641             :      prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8] */
     642           0 :   uchar signable[ 26UL + 32UL + 8UL + 32UL + 32UL + 8UL ];
     643           0 :   uchar * p = signable;
     644           0 :   FD_STORE( ulong, p, 18UL );                    p += 8UL;
     645           0 :   fd_memcpy( p, "\xffSOLANA_PRUNE_DATA", 18UL ); p += 18UL;
     646           0 :   fd_memcpy( p, gossip->identity_pubkey, 32UL ); p += 32UL;
     647           0 :   FD_STORE( ulong, p, 1UL );                     p += 8UL;
     648           0 :   fd_memcpy( p, origin, 32UL );                  p += 32UL;
     649           0 :   fd_memcpy( p, relayer, 32UL );                 p += 32UL;
     650           0 :   FD_STORE( ulong, p, wallclock );               p += 8UL;
     651             : 
     652           0 :   uchar signature[ 64UL ];
     653           0 :   gossip->sign_fn( gossip->sign_ctx, signable, sizeof(signable), FD_KEYGUARD_SIGN_TYPE_ED25519, signature );
     654             : 
     655             :   /* Build the on-wire packet:
     656             :      tag(4) + sender(32) + pubkey(32) + prunes_len(8) + prunes[32]
     657             :      + signature(64) + destination(32) + wallclock(8) */
     658           0 :   uchar pkt[ 4UL + 32UL + 32UL + 8UL + 32UL + 64UL + 32UL + 8UL ];
     659           0 :   uchar * q = pkt;
     660           0 :   FD_STORE( uint, q, FD_GOSSIP_MESSAGE_PRUNE );  q += 4UL;
     661           0 :   fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL;  /* sender */
     662           0 :   fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL;  /* PruneData.pubkey */
     663           0 :   FD_STORE( ulong, q, 1UL );                     q += 8UL;
     664           0 :   fd_memcpy( q, origin, 32UL );                  q += 32UL;
     665           0 :   fd_memcpy( q, signature, 64UL );               q += 64UL;
     666           0 :   fd_memcpy( q, relayer, 32UL );                 q += 32UL;
     667           0 :   FD_STORE( ulong, q, wallclock );               q += 8UL;
     668             : 
     669           0 :   gossip->send_fn( gossip->send_ctx, stem, pkt, sizeof(pkt), &dest_addr, (ulong)now );
     670             : 
     671           0 :   gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PRUNE ]++;
     672           0 :   gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PRUNE ] += sizeof(pkt) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     673           0 : }
     674             : 
     675             : static void
     676             : tx_prunes( fd_gossip_t *       gossip,
     677             :              fd_stem_context_t * stem,
     678           0 :              long                now ) {
     679           0 :   uchar const * relayer;
     680           0 :   uchar const * origin;
     681           0 :   while( fd_prune_finder_pop_prune( gossip->prune_finder, &relayer, &origin ) ) {
     682           0 :     tx_prune( gossip, relayer, origin, stem, now );
     683           0 :   }
     684           0 : }
     685             : 
     686             : static void
     687             : rx_push( fd_gossip_t *            gossip,
     688             :          fd_gossip_push_t const * push,
     689             :          uchar const *            payload,
     690             :          uchar const *            failed,
     691             :          long                     now,
     692           0 :          fd_stem_context_t *      stem ) {
     693           0 :   long results[ 17UL ];
     694           0 :   rx_values( gossip, push->values_len, push->values, payload, failed, stem, now, results );
     695             : 
     696           0 :   for( ulong i=0UL; i<push->values_len; i++ ) {
     697           0 :     if( FD_UNLIKELY( failed[ i ] ) ) continue;
     698           0 :     if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PUSH_IDX ]++;
     699           0 :     else if( results[ i ]<0L )       gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_STALE_IDX ]++;
     700           0 :     else                             gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_DUPLICATE_IDX ]++;
     701             : 
     702           0 :     ulong num_dups;
     703           0 :     if( FD_LIKELY( !results[ i ] ) )          num_dups = 0UL;
     704           0 :     else if( FD_UNLIKELY( results[ i ]<0L ) ) num_dups = ULONG_MAX; /* stale => never timely */
     705           0 :     else                                      num_dups = (ulong)results[ i ];
     706             : 
     707           0 :     ulong origin_stake = get_stake( gossip, push->values[ i ].origin );
     708           0 :     fd_prune_finder_record( gossip->prune_finder, push->values[ i ].origin, origin_stake, push->from, get_stake( gossip, push->from ), num_dups );
     709           0 :   }
     710             : 
     711           0 :   tx_prunes( gossip, stem, now );
     712           0 : }
     713             : 
     714             : static void
     715             : rx_prune( fd_gossip_t *             gossip,
     716           0 :           fd_gossip_prune_t const * prune ) {
     717           0 :   for( ulong i=0UL; i<prune->prunes_len; i++ ) {
     718           0 :     fd_active_set_prune( gossip->active_set,
     719           0 :                          prune->pubkey,
     720           0 :                          prune->prunes[ i ],
     721           0 :                          get_stake( gossip, prune->prunes[ i ] ) );
     722           0 :   }
     723           0 : }
     724             : 
     725             : 
     726             : static void
     727             : rx_ping( fd_gossip_t *            gossip,
     728             :          fd_gossip_ping_t const * ping,
     729             :          fd_ip4_port_t            peer_address,
     730             :          fd_stem_context_t *      stem,
     731           0 :          long                     now ) {
     732           0 :   uchar out_payload[ sizeof(fd_gossip_pong_t)+4UL];
     733           0 :   FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PONG );
     734             : 
     735           0 :   fd_gossip_pong_t * out_pong = (fd_gossip_pong_t *)(out_payload + 4UL);
     736           0 :   fd_memcpy( out_pong->from, gossip->identity_pubkey, 32UL );
     737             : 
     738             :   /* fd_keyguard checks payloads for certain patterns before performing the
     739             :      sign. Pattern-matching can't be done on hashed data, so we need
     740             :      to supply the pre-hashed image to the sign fn (fd_keyguard will hash when
     741             :      supplied with FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519) while also hashing
     742             :      the image ourselves onto pong->ping_hash */
     743             : 
     744           0 :   uchar pre_image[ 48UL ];
     745           0 :   fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
     746           0 :   fd_memcpy( pre_image+16UL, ping->token, 32UL );
     747             : 
     748           0 :   fd_sha256_hash( pre_image, 48UL, out_pong->hash );
     749             : 
     750           0 :   gossip->sign_fn( gossip->sign_ctx, pre_image, 48UL, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519, out_pong->signature );
     751           0 :   gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), &peer_address, (ulong)now );
     752             : 
     753           0 :   gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PONG ]++;
     754           0 :   gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PONG ] += sizeof(out_payload)+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     755           0 : }
     756             : 
     757             : static void
     758             : rx_pong( fd_gossip_t *            gossip,
     759             :          fd_gossip_pong_t const * pong,
     760             :          fd_ip4_port_t            peer_address,
     761           0 :          long                     now ) {
     762           0 :   ulong stake = get_stake( gossip, pong->from );
     763           0 :   fd_ping_tracker_register( gossip->ping_tracker, pong->from, stake, peer_address, pong->hash, now );
     764           0 : }
     765             : 
     766             : void
     767             : fd_gossip_rx( fd_gossip_t *       gossip,
     768             :               fd_ip4_port_t       peer,
     769             :               uchar const *       data,
     770             :               ulong               data_sz,
     771             :               long                now,
     772           0 :               fd_stem_context_t * stem ) {
     773             :   /* TODO: Implement traffic shaper / bandwidth limiter */
     774           0 :   FD_TEST( data_sz>=sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS );
     775           0 :   fd_gossip_message_t const * message = (fd_gossip_message_t const *)data;
     776           0 :   uchar const *               failed  = data+sizeof(fd_gossip_message_t);
     777           0 :   uchar const *               payload = data+sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS;
     778             : 
     779           0 :   switch( message->tag ) {
     780           0 :     case FD_GOSSIP_MESSAGE_PULL_REQUEST:  rx_pull_request( gossip, message->pull_request, peer, stem, now );              break;
     781           0 :     case FD_GOSSIP_MESSAGE_PULL_RESPONSE: rx_pull_response( gossip, message->pull_response, payload, failed, stem, now ); break;
     782           0 :     case FD_GOSSIP_MESSAGE_PUSH:          rx_push( gossip, message->push, payload, failed, now, stem );                   break;
     783           0 :     case FD_GOSSIP_MESSAGE_PRUNE:         rx_prune( gossip, message->prune );                                             break;
     784           0 :     case FD_GOSSIP_MESSAGE_PING:          rx_ping( gossip, message->ping, peer, stem, now );                              break;
     785           0 :     case FD_GOSSIP_MESSAGE_PONG:          rx_pong( gossip, message->pong, peer, now );                                    break;
     786           0 :     default:
     787           0 :       FD_LOG_CRIT(( "Unknown gossip message type %u", message->tag ));
     788           0 :       break;
     789           0 :   }
     790           0 : }
     791             : 
     792             : static int
     793             : fd_gossip_push( fd_gossip_t *             gossip,
     794             :                 fd_gossip_value_t const * value,
     795             :                 fd_stem_context_t *       stem,
     796           0 :                 long                      now ) {
     797           0 :   uchar serialized[ FD_GOSSIP_VALUE_MAX_SZ ];
     798           0 :   long serialized_sz = fd_gossip_value_serialize( value, serialized, sizeof(serialized) );
     799           0 :   FD_TEST( serialized_sz!=-1L );
     800           0 :   gossip->sign_fn( gossip->sign_ctx, serialized+64UL, (ulong)serialized_sz-64UL, FD_KEYGUARD_SIGN_TYPE_ED25519, serialized );
     801             : 
     802           0 :   int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
     803           0 :   if( FD_UNLIKELY( fd_crds_insert( gossip->crds, value, serialized, (ulong)serialized_sz, gossip->identity_stake, origin_active, 1, now, stem ) ) ) return -1;
     804             : 
     805           0 :   fd_active_set_push( gossip->active_set, serialized, (ulong)serialized_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
     806           0 :   return 0;
     807           0 : }
     808             : 
     809             : int
     810             : fd_gossip_push_vote( fd_gossip_t *       gossip,
     811             :                      uchar const *       txn,
     812             :                      ulong               txn_sz,
     813             :                      fd_stem_context_t * stem,
     814           0 :                      long                now ) {
     815           0 :   fd_gossip_value_t value = {
     816           0 :     .tag = FD_GOSSIP_VALUE_VOTE,
     817           0 :     .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
     818           0 :     .vote = {{
     819           0 :       .index = 0UL, /* TODO */
     820           0 :       .transaction_len = txn_sz,
     821           0 :     }}
     822           0 :   };
     823           0 :   fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
     824           0 :   FD_TEST( txn_sz<=sizeof(value.vote->transaction) );
     825           0 :   fd_memcpy( value.vote->transaction, txn, txn_sz );
     826             : 
     827           0 :   return fd_gossip_push( gossip, &value, stem, now );
     828           0 : }
     829             : 
     830             : int
     831             : fd_gossip_push_duplicate_shred( fd_gossip_t *                       gossip,
     832             :                                 fd_gossip_duplicate_shred_t const * duplicate_shred,
     833             :                                 fd_stem_context_t *                 stem,
     834           0 :                                 long                                now ) {
     835           0 :   fd_gossip_value_t value = {
     836           0 :     .tag = FD_GOSSIP_VALUE_DUPLICATE_SHRED,
     837           0 :     .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
     838           0 :   };
     839           0 :   fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
     840           0 :   *value.duplicate_shred = *duplicate_shred;
     841             : 
     842           0 :   return fd_gossip_push( gossip, &value, stem, now );
     843           0 : }
     844             : 
     845             : static void
     846             : tx_ping( fd_gossip_t *       gossip,
     847             :          fd_stem_context_t * stem,
     848             :          long                now,
     849           0 :          int *               charge_busy ) {
     850           0 :   uchar out_payload[ sizeof(fd_gossip_ping_t) + 4UL ];
     851           0 :   FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PING );
     852             : 
     853           0 :   fd_gossip_ping_t * out_ping = (fd_gossip_ping_t *)( out_payload+4UL );
     854           0 :   fd_memcpy( out_ping->from, gossip->identity_pubkey, 32UL );
     855             : 
     856           0 :   uchar const *         peer_pubkey;
     857           0 :   uchar const *         ping_token;
     858           0 :   fd_ip4_port_t const * peer_address;
     859           0 :   while( fd_ping_tracker_pop_request( gossip->ping_tracker,
     860           0 :                                       now,
     861           0 :                                       &peer_pubkey,
     862           0 :                                       &peer_address,
     863           0 :                                       &ping_token ) ) {
     864           0 :     fd_memcpy( out_ping->token, ping_token, 32UL );
     865             : 
     866           0 :     gossip->sign_fn( gossip->sign_ctx, out_ping->token, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519, out_ping->signature );
     867           0 :     gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), peer_address, (ulong)now );
     868             : 
     869           0 :     gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PING ]++;
     870           0 :     gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PING ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     871           0 :     if( charge_busy ) *charge_busy = 1;
     872           0 :   }
     873           0 : }
     874             : 
     875             : /* Construct and send a pull request to a random peer.  The pull
     876             :    request contains a bloom filter over our known CRDS hashes so that
     877             :    the peer can respond with values we are missing.
     878             : 
     879             :    NOTE: Divergence from Agave:
     880             :     - Agave builds up to 2^mask_bits filters per pull period
     881             :       (sampling up to 1024), each covering a distinct partition of
     882             :       the hash space.  We build and send exactly one filter per
     883             :       pull period, covering 1/2^mask_bits of the space.
     884             : 
     885             :    Maximum bloom filter bits in a PullRequest packet:
     886             : 
     887             :      PACKET_DATA_SIZE             = 1232   (= 1280 - 40 - 8)
     888             : 
     889             :      Bytes consumed by non-bloom fields:
     890             :        discriminant(4) + keys_len(8) + keys(8*num_keys) +
     891             :        has_bits(1) + bloom_vec_len(8) + bloom_bits_count(8) +
     892             :        bloom_num_bits_set(8) + mask(8) + mask_bits(4)
     893             :        + contact_info_crds_val(crds_val_sz)
     894             :        = 49 + 8*num_keys + crds_val_sz
     895             : 
     896             :      The bitvec is serialized as u64 words, so the bitvec storage is
     897             :      ceil(num_bits/64)*8 bytes.  The remaining packet bytes must
     898             :      accommodate this.
     899             : 
     900             :      Agave determines the max_bytes parameter (input to Bloom::random)
     901             :      via an empirical cache (get_max_bloom_filter_bytes).  max_bytes*8
     902             :      is passed as the max_bits cap to Bloom::random, but actual
     903             :      num_bits is only ~83% of max_bits (the E/D ratio for p=0.1).
     904             :      We replicate this with a closed-form inversion: the largest
     905             :      max_bytes where ceil(num_bits/64)*8 fits in remaining space is
     906             :      max_bytes = floor(D * floor(64*W/E) / 8), where W is the max
     907             :      number of u64 words, E and D are the bloom filter constants.
     908             : 
     909             :      num_keys depends on the bloom sizing, which depends on the
     910             :      overhead, which depends on num_keys.  However there is a closed
     911             :      form: compute num_keys from the pessimistic KEYS=8 overhead, then
     912             :      recompute the tight overhead with the true num_keys.  This always
     913             :      converges in one step because the optimal key count is
     914             :      D*ln(2) ≈ 3.32 (where D = ln(p)/ln(1/2^ln2)), far from any
     915             :      rounding boundary.  For p=0.1 and KEYS=8, num_keys is always 3.
     916             : 
     917             :      NB: The has_bits(1) + bloom_vec_len(8) are only written when
     918             :      num_bits>=1.  fd_bloom_num_bits clamps to [1, max_bits], so
     919             :      num_bits>=1 always holds and this layout is correct. */
     920             : 
     921             : static void
     922             : tx_pull_request( fd_gossip_t *       gossip,
     923             :                  fd_stem_context_t * stem,
     924           0 :                  long                now ) {
     925           0 :   ulong total_crds_vals = fd_crds_len( gossip->crds ) + fd_gossip_purged_len( gossip->purged );
     926           0 :   ulong num_items       = fd_ulong_max( 65536UL, total_crds_vals );
     927           0 :   ulong crds_val_sz     = gossip->my_contact_info.crds_val_sz;
     928             : 
     929             :   /* Step 1: Compute num_keys from the pessimistic KEYS=8 overhead
     930             :      (same initial estimate Agave uses in CrdsFilterSet::new). */
     931           0 :   ulong  pessimistic_overhead = 49UL + 8UL*(ulong)BLOOM_NUM_KEYS + crds_val_sz;
     932           0 :   FD_TEST( pessimistic_overhead<FD_GOSSIP_MTU );
     933           0 :   double pessimistic_max_bits = (double)( 8UL*( FD_GOSSIP_MTU - pessimistic_overhead ) );
     934           0 :   double pessimistic_items    = fd_bloom_max_items( pessimistic_max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
     935           0 :   FD_TEST( pessimistic_items>0.0 );
     936           0 :   ulong  pessimistic_num_bits = fd_bloom_num_bits( pessimistic_items, BLOOM_FALSE_POSITIVE_RATE, pessimistic_max_bits );
     937           0 :   ulong  num_keys             = fd_bloom_num_keys( (double)pessimistic_num_bits, pessimistic_items );
     938             : 
     939             :   /* Step 2: Recompute with the tight overhead using the true num_keys.
     940             :      Find the largest max_bytes parameter (matching Agave's
     941             :      get_max_bloom_filter_bytes cache) such that the resulting bitvec
     942             :      fits in the remaining packet space.
     943             : 
     944             :      Given:
     945             :        max_items = ceil(max_bits / D)   where D = -K / ln(1-exp(ln(p)/K))
     946             :        num_bits  = ceil(max_items * E)  where E = ln(p) / ln(1/2^ln2)
     947             : 
     948             :      We need ceil(num_bits/64)*8 <= remaining, i.e. num_bits <= 64*W
     949             :      where W = floor(remaining/8).  Working backwards:
     950             :        max_items <= I  where I = floor(64*W / E)
     951             :        max_bytes <= D*I / 8
     952             : 
     953             :      So max_bytes = floor(D * floor(64*W/E) / 8). */
     954           0 :   ulong  overhead       = 49UL + 8UL*num_keys + crds_val_sz;
     955           0 :   FD_TEST( overhead<FD_GOSSIP_MTU );
     956           0 :   ulong  remaining      = FD_GOSSIP_MTU - overhead;
     957           0 :   ulong  max_words      = remaining / 8UL; /* max u64 words for bitvec */
     958             : 
     959           0 :   double E = log( BLOOM_FALSE_POSITIVE_RATE ) / log( 1.0 / pow( 2.0, log( 2.0 ) ) );
     960           0 :   double D = -BLOOM_NUM_KEYS / log( 1.0 - exp( log( BLOOM_FALSE_POSITIVE_RATE ) / BLOOM_NUM_KEYS ) );
     961           0 :   ulong  I = (ulong)floor( 64.0 * (double)max_words / E );
     962           0 :   ulong  max_bytes = (ulong)floor( D * (double)I / 8.0 );
     963             : 
     964           0 :   double max_bits  = (double)( max_bytes * 8UL );
     965           0 :   double max_items = fd_bloom_max_items( max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
     966           0 :   FD_TEST( max_items>0.0 );
     967           0 :   ulong  num_bits  = fd_bloom_num_bits( max_items, BLOOM_FALSE_POSITIVE_RATE, max_bits );
     968           0 :   FD_TEST( num_bits>=1UL );
     969           0 :   FD_TEST( (num_bits+63UL)/64UL<=max_words ); /* verify bitvec fits */
     970           0 :   FD_TEST( fd_bloom_num_keys( (double)num_bits, max_items )==num_keys ); /* verify convergence */
     971             : 
     972           0 :   double _mask_bits     = ceil( log2( (double)num_items / max_items ) );
     973           0 :   uint   mask_bits      = _mask_bits >= 0.0 ? fd_uint_min( (uint)_mask_bits, 63U ) : 0U;
     974           0 :   ulong  mask           = fd_rng_ulong( gossip->rng ) | (~0UL>>(mask_bits));
     975             : 
     976           0 :   uchar payload[ FD_GOSSIP_MTU ] = {0};
     977             : 
     978           0 :   ulong * keys_ptr, * bits_ptr, * bits_set;
     979           0 :   long payload_sz = fd_gossip_pull_request_init( payload,
     980           0 :                                                  FD_GOSSIP_MTU,
     981           0 :                                                  num_keys,
     982           0 :                                                  num_bits,
     983           0 :                                                  mask,
     984           0 :                                                  mask_bits,
     985           0 :                                                  gossip->my_contact_info.crds_val,
     986           0 :                                                  gossip->my_contact_info.crds_val_sz,
     987           0 :                                                  &keys_ptr,
     988           0 :                                                  &bits_ptr,
     989           0 :                                                  &bits_set );
     990           0 :   FD_TEST( -1L!=payload_sz );
     991             : 
     992           0 :   fd_bloom_t filter[1];
     993           0 :   fd_bloom_init_inplace( keys_ptr, bits_ptr, num_keys, num_bits, 0, gossip->rng, BLOOM_FALSE_POSITIVE_RATE, filter );
     994             : 
     995           0 :   uchar iter_mem[ 16UL ];
     996           0 :   for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
     997           0 :        !fd_crds_mask_iter_done( it, gossip->crds );
     998           0 :        it = fd_crds_mask_iter_next( it, gossip->crds ) ) {
     999           0 :     fd_bloom_insert( filter, fd_crds_entry_hash( fd_crds_mask_iter_entry( it, gossip->crds ) ), 32UL );
    1000           0 :   }
    1001             : 
    1002           0 :   for( fd_gossip_purged_mask_iter_t * it = fd_gossip_purged_mask_iter_init( gossip->purged, mask, mask_bits, iter_mem );
    1003           0 :        !fd_gossip_purged_mask_iter_done( it, gossip->purged );
    1004           0 :        it = fd_gossip_purged_mask_iter_next( it, gossip->purged ) ){
    1005           0 :     fd_bloom_insert( filter, fd_gossip_purged_mask_iter_hash( it, gossip->purged ), 32UL );
    1006           0 :   }
    1007             : 
    1008           0 :   int num_bits_set = 0;
    1009           0 :   for( ulong i=0UL; i<(num_bits+63)/64UL; i++ ) num_bits_set += fd_ulong_popcnt( bits_ptr[ i ] );
    1010           0 :   *bits_set = (ulong)num_bits_set;
    1011             : 
    1012           0 :   ulong idx = fd_gossip_wsample_sample_pull_request( gossip->wsample );
    1013           0 :   fd_ip4_port_t peer_addr;
    1014           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) {
    1015           0 :     if( FD_UNLIKELY( !gossip->entrypoints_cnt ) ) {
    1016             :       /* We are the bootstrapping node, and nobody else is present in
    1017             :          the cluster.  Nowhere to send the pull request. */
    1018           0 :       return;
    1019           0 :     }
    1020           0 :     peer_addr = random_entrypoint( gossip );
    1021           0 :   } else {
    1022           0 :     fd_gossip_contact_info_t const * peer = fd_crds_ci( gossip->crds, idx );
    1023           0 :     peer_addr.addr = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0 : peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4;
    1024           0 :     peer_addr.port = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port;
    1025           0 :   }
    1026           0 :   gossip->send_fn( gossip->send_ctx, stem, payload, (ulong)payload_sz, &peer_addr, (ulong)now );
    1027             : 
    1028           0 :   gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PULL_REQUEST ]++;
    1029           0 :   gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PULL_REQUEST ] += (ulong)payload_sz + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
    1030           0 : }
    1031             : 
    1032             : void
    1033             : fd_gossip_advance( fd_gossip_t *       gossip,
    1034             :                    long                now,
    1035             :                    fd_stem_context_t * stem,
    1036           0 :                    int *               charge_busy ) {
    1037           0 :   outbound_budget_replenish( gossip, now );
    1038             : 
    1039           0 :   fd_gossip_purged_expire( gossip->purged, now );
    1040           0 :   fd_active_set_advance( gossip->active_set, stem, now, charge_busy );
    1041           0 :   fd_crds_advance( gossip->crds, now, stem, charge_busy );
    1042             : 
    1043           0 :   tx_ping( gossip, stem, now, charge_busy );
    1044           0 :   if( FD_UNLIKELY( now>=gossip->timers.next_pull_request ) ) {
    1045           0 :     tx_pull_request( gossip, stem, now );
    1046           0 :     if( charge_busy ) *charge_busy = 1;
    1047             :     /* 1.6ms (625/s).  Agave sends min(1024, ceil(2^mask_bits/8))
    1048             :        filters every 500ms.  For a typical mainnet table (~65k items,
    1049             :        mask_bits≈7) that is ~16 filters/500ms = one every 31ms.  We
    1050             :        send a single filter per round, so we fire ~20× more often to
    1051             :        compensate for sending one filter instead of many per period.
    1052             : 
    1053             :        We considered dynamically matching Agave's exact rate by
    1054             :        computing 500ms/filters_per_round from mask_bits each round,
    1055             :        but this caused slow table fill on startup (mask_bits starts
    1056             :        low -> long intervals -> few pulls -> slow CRDS population).
    1057             :        Adaptive boosting (counter-based, timestamp-based, and
    1058             :        threshold-based) all added complexity without clear benefit:
    1059             :        counter decay lost state between send and response arrival,
    1060             :        timestamp checks never disarmed because trickle inserts kept
    1061             :        refreshing the window, and threshold heuristics required
    1062             :        tuning constants that varied by cluster size.
    1063             : 
    1064             :        A fixed 1.6ms is simpler and robust: the cost of a redundant
    1065             :        pull request is negligible (a single 1232-byte packet whose
    1066             :        reply will be empty if we're already caught up), and it
    1067             :        guarantees fast table fill on startup without any adaptive
    1068             :        machinery. */
    1069           0 :     gossip->timers.next_pull_request = now+1600L*1000L;
    1070           0 :   }
    1071           0 :   if( FD_UNLIKELY( now>=gossip->timers.next_contact_info_refresh ) ) {
    1072             :     /* TODO: Frequency of this? More often if observing? */
    1073           0 :     refresh_contact_info( gossip, now );
    1074           0 :     int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
    1075           0 :     fd_crds_insert( gossip->crds, gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_stake, origin_active, 1, now, stem );
    1076           0 :     fd_active_set_push( gossip->active_set, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
    1077           0 :     gossip->timers.next_contact_info_refresh = now+15L*500L*1000L*1000L; /* TODO: Jitter */
    1078           0 :     if( charge_busy ) *charge_busy = 1;
    1079           0 :   }
    1080           0 : }
    1081             : 
    1082             : void
    1083             : fd_gossip_ping_tracker_track( fd_gossip_t * gossip,
    1084             :                               uchar const * peer_pubkey,
    1085             :                               fd_ip4_port_t peer_address,
    1086           0 :                               long          now ) {
    1087           0 :   ulong origin_stake = get_stake( gossip, peer_pubkey );
    1088           0 :   fd_ping_tracker_track( gossip->ping_tracker, peer_pubkey, origin_stake, peer_address, now );
    1089           0 : }

Generated by: LCOV version 1.14