LCOV - code coverage report
Current view: top level - flamenco/gossip - fd_active_set.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 186 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 12 0.0 %

          Line data    Source code
       1             : #include "fd_active_set.h"
       2             : #include "fd_gossip_txbuild.h"
       3             : #include "fd_gossip_wsample.h"
       4             : #include "fd_bloom.h"
       5             : #include "../../util/net/fd_net_headers.h"
       6             : 
       7             : #define FD_ACTIVE_SET_STAKE_ENTRIES    (25UL)
       8             : #define FD_ACTIVE_SET_PEERS_PER_ENTRY  (12UL)
       9             : #define FD_ACTIVE_SET_MAX_PEERS        (FD_ACTIVE_SET_STAKE_ENTRIES*FD_ACTIVE_SET_PEERS_PER_ENTRY) /* 300 */
      10             : 
      11             : struct fd_active_set_peer {
      12             :   long         timestamp;
      13             :   ulong        ci_idx;
      14             :   fd_bloom_t * bloom;
      15             :   fd_gossip_txbuild_t txbuild[1];
      16             : 
      17             :   struct {
      18             :     ulong prev;
      19             :     ulong next;
      20             :   } dlist;
      21             : };
      22             : 
      23             : typedef struct fd_active_set_peer fd_active_set_peer_t;
      24             : 
      25             : #define DLIST_NAME  push_dlist
      26             : #define DLIST_ELE_T fd_active_set_peer_t
      27           0 : #define DLIST_PREV  dlist.prev
      28           0 : #define DLIST_NEXT  dlist.next
      29             : #include "../../util/tmpl/fd_dlist.c"
      30             : 
      31             : struct fd_active_set_entry {
      32             :   ulong nodes_idx; /* points to oldest entry in set */
      33             :   ulong nodes_len;
      34             : };
      35             : 
      36             : typedef struct fd_active_set_entry fd_active_set_entry_t;
      37             : 
      38             : struct __attribute__((aligned(FD_ACTIVE_SET_ALIGN))) fd_active_set_private {
      39             :   fd_active_set_entry_t entries[ FD_ACTIVE_SET_STAKE_ENTRIES ][ 1 ];
      40             :   fd_active_set_peer_t peers[ FD_ACTIVE_SET_MAX_PEERS ];
      41             : 
      42             :   long  next_rotate_nanos;
      43             :   ulong rotate_bucket; /* 0..24, round-robin */
      44             : 
      45             :   uchar identity_pubkey[ 32UL ];
      46             :   ulong identity_stake;
      47             : 
      48             :   fd_gossip_wsample_t * wsample;
      49             :   fd_crds_t * crds;
      50             :   fd_rng_t * rng;
      51             :   push_dlist_t * push_dlist;
      52             : 
      53             :   fd_gossip_send_fn send_fn;
      54             :   void *            send_fn_ctx;
      55             : 
      56             :   fd_active_set_metrics_t metrics[1];
      57             : 
      58             :   ulong magic; /* ==FD_ACTIVE_SET_MAGIC */
      59             : };
      60             : 
      61             : FD_FN_CONST ulong
      62           0 : fd_active_set_align( void ) {
      63           0 :   return FD_ACTIVE_SET_ALIGN;
      64           0 : }
      65             : 
      66             : FD_FN_CONST ulong
      67           0 : fd_active_set_footprint( void ) {
      68           0 :   ulong l;
      69           0 :   l = FD_LAYOUT_INIT;
      70           0 :   l = FD_LAYOUT_APPEND( l, FD_ACTIVE_SET_ALIGN, sizeof(fd_active_set_t) );
      71           0 :   l = FD_LAYOUT_APPEND( l, FD_BLOOM_ALIGN,      25UL*12UL*fd_bloom_footprint( 0.1, 32768UL ) );
      72           0 :   l = FD_LAYOUT_APPEND( l, push_dlist_align(),  push_dlist_footprint() );
      73           0 :   return FD_LAYOUT_FINI( l, FD_ACTIVE_SET_ALIGN );
      74           0 : }
      75             : 
      76             : void *
      77             : fd_active_set_new( void *                shmem,
      78             :                    fd_gossip_wsample_t * wsample,
      79             :                    fd_crds_t *           crds,
      80             :                    fd_rng_t *            rng,
      81             :                    uchar const *         identity_pubkey,
      82             :                    ulong                 identity_stake,
      83             :                    fd_gossip_send_fn     send_fn,
      84           0 :                    void *                send_fn_ctx ) {
      85           0 :   if( FD_UNLIKELY( !shmem ) ) {
      86           0 :     FD_LOG_WARNING(( "NULL shmem" ));
      87           0 :     return NULL;
      88           0 :   }
      89             : 
      90           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_active_set_align() ) ) ) {
      91           0 :     FD_LOG_WARNING(( "misaligned shmem" ));
      92           0 :     return NULL;
      93           0 :   }
      94             : 
      95           0 :   ulong bloom_footprint = fd_bloom_footprint( 0.1, 32768UL );
      96             : 
      97           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      98           0 :   fd_active_set_t * as       = FD_SCRATCH_ALLOC_APPEND( l, FD_ACTIVE_SET_ALIGN, sizeof(fd_active_set_t) );
      99           0 :   uchar * _blooms            = FD_SCRATCH_ALLOC_APPEND( l, FD_BLOOM_ALIGN,      25UL*12UL*bloom_footprint );
     100           0 :   push_dlist_t * _push_dlist = FD_SCRATCH_ALLOC_APPEND( l, push_dlist_align(),  push_dlist_footprint() );
     101             : 
     102           0 :   as->next_rotate_nanos = 0L;
     103           0 :   as->rotate_bucket     = 0UL;
     104           0 :   fd_memcpy( as->identity_pubkey, identity_pubkey, 32UL );
     105           0 :   as->identity_stake = identity_stake;
     106             : 
     107           0 :   as->wsample = wsample;
     108           0 :   as->crds = crds;
     109           0 :   as->rng = rng;
     110           0 :   for( ulong i=0UL; i<25UL; i++ ) {
     111           0 :     fd_active_set_entry_t * entry = as->entries[ i ];
     112           0 :     entry->nodes_idx = 0UL;
     113           0 :     entry->nodes_len = 0UL;
     114             : 
     115           0 :     for( ulong j=0UL; j<12UL; j++ ) {
     116           0 :       fd_active_set_peer_t * peer = &as->peers[ i*12UL+j ];
     117           0 :       peer->bloom = fd_bloom_join( fd_bloom_new( _blooms, rng, 0.1, 32768UL ) );
     118           0 :       if( FD_UNLIKELY( !peer->bloom ) ) {
     119           0 :         FD_LOG_WARNING(( "failed to create bloom filter" ));
     120           0 :         return NULL;
     121           0 :       }
     122           0 :       _blooms += bloom_footprint;
     123           0 :     }
     124           0 :   }
     125             : 
     126           0 :   as->push_dlist = push_dlist_join( push_dlist_new( _push_dlist ) );
     127             : 
     128           0 :   as->send_fn = send_fn;
     129           0 :   as->send_fn_ctx = send_fn_ctx;
     130             : 
     131           0 :   memset( as->metrics, 0, sizeof(fd_active_set_metrics_t) );
     132             : 
     133           0 :   FD_COMPILER_MFENCE();
     134           0 :   FD_VOLATILE( as->magic ) = FD_ACTIVE_SET_MAGIC;
     135           0 :   FD_COMPILER_MFENCE();
     136             : 
     137           0 :   return (void *)as;
     138           0 : }
     139             : 
     140             : fd_active_set_t *
     141           0 : fd_active_set_join( void * shas ) {
     142           0 :   if( FD_UNLIKELY( !shas ) ) {
     143           0 :     FD_LOG_WARNING(( "NULL shas" ));
     144           0 :     return NULL;
     145           0 :   }
     146             : 
     147           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shas, fd_active_set_align() ) ) ) {
     148           0 :     FD_LOG_WARNING(( "misaligned shas" ));
     149           0 :     return NULL;
     150           0 :   }
     151             : 
     152           0 :   fd_active_set_t * as = (fd_active_set_t *)shas;
     153             : 
     154           0 :   if( FD_UNLIKELY( as->magic!=FD_ACTIVE_SET_MAGIC ) ) {
     155           0 :     FD_LOG_WARNING(( "bad magic" ));
     156           0 :     return NULL;
     157           0 :   }
     158             : 
     159           0 :   return as;
     160           0 : }
     161             : 
     162             : fd_active_set_metrics_t const *
     163           0 : fd_active_set_metrics( fd_active_set_t const * active_set ) {
     164           0 :   return active_set->metrics;
     165           0 : }
     166             : 
     167             : void
     168             : fd_active_set_set_identity( fd_active_set_t * active_set,
     169             :                             uchar const *     identity_pubkey,
     170           0 :                             ulong             identity_stake ) {
     171           0 :   fd_memcpy( active_set->identity_pubkey, identity_pubkey, 32UL );
     172           0 :   active_set->identity_stake = identity_stake;
     173           0 : }
     174             : 
     175             : void
     176             : fd_active_set_prune( fd_active_set_t * active_set,
     177             :                      uchar const *     push_dest,
     178             :                      uchar const *     origin,
     179           0 :                      ulong             origin_stake ) {
     180           0 :   if( FD_UNLIKELY( !memcmp( active_set->identity_pubkey, origin, 32UL ) ) ) return;
     181             : 
     182           0 :   ulong bucket = fd_active_set_stake_bucket( fd_ulong_min( active_set->identity_stake, origin_stake ) );
     183           0 :   for( ulong i=0UL; i<active_set->entries[ bucket ]->nodes_len; i++ ) {
     184           0 :     ulong peer_idx = (active_set->entries[ bucket ]->nodes_idx+i) % 12UL;
     185           0 :     uchar const * peer_pubkey = fd_crds_ci_pubkey( active_set->crds, active_set->peers[ bucket*12UL+peer_idx ].ci_idx );
     186           0 :     if( FD_UNLIKELY( !memcmp( peer_pubkey, push_dest, 32UL ) ) ) {
     187           0 :       fd_bloom_insert( active_set->peers[ bucket*12UL+peer_idx ].bloom, origin, 32UL );
     188           0 :       return;
     189           0 :     }
     190           0 :   }
     191           0 : }
     192             : 
     193             : static void
     194             : push_flush( fd_active_set_t *      active_set,
     195             :             fd_active_set_peer_t * peer,
     196             :             fd_stem_context_t *    stem,
     197           0 :             long                   now ) {
     198           0 :   if( FD_UNLIKELY( !peer->txbuild->crds_len ) ) return;
     199             : 
     200           0 :   fd_gossip_contact_info_t const * ci = fd_crds_ci( active_set->crds, peer->ci_idx );
     201             :   // TODO: Support ipv6, or prevent ending up in set
     202           0 :   fd_ip4_port_t dest_addr = {
     203           0 :     .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
     204           0 :     .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port,
     205           0 :   };
     206             : 
     207           0 :   push_dlist_ele_remove( active_set->push_dlist, peer, active_set->peers );
     208             : 
     209           0 :   active_set->send_fn( active_set->send_fn_ctx, stem, peer->txbuild->bytes, peer->txbuild->bytes_len, &dest_addr, (ulong)now );
     210             : 
     211           0 :   active_set->metrics->message_tx[ peer->txbuild->tag ]++;
     212           0 :   active_set->metrics->message_tx_bytes[ peer->txbuild->tag ] += peer->txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     213           0 :   for( ulong i=0UL; i<peer->txbuild->crds_len; i++ ) {
     214           0 :     active_set->metrics->crds_tx_push[ peer->txbuild->crds[ i ].tag ]++;
     215           0 :     active_set->metrics->crds_tx_push_bytes[ peer->txbuild->crds[ i ].tag ] += peer->txbuild->crds[ i ].sz;
     216           0 :   }
     217             : 
     218           0 :   fd_gossip_txbuild_init( peer->txbuild, active_set->identity_pubkey, FD_GOSSIP_MESSAGE_PUSH );
     219           0 : }
     220             : 
     221             : void
     222             : fd_active_set_remove_peer( fd_active_set_t * active_set,
     223           0 :                            ulong             ci_idx ) {
     224           0 :   for( ulong b=0UL; b<25UL; b++ ) {
     225           0 :     fd_active_set_entry_t * entry = active_set->entries[ b ];
     226             : 
     227           0 :     for( ulong i=0UL; i<entry->nodes_len; i++ ) {
     228           0 :       ulong peer_idx = (entry->nodes_idx+i) % 12UL;
     229           0 :       if( FD_UNLIKELY( active_set->peers[ b*12UL+peer_idx ].ci_idx==ci_idx ) ) {
     230           0 :         fd_active_set_peer_t * peer = &active_set->peers[ b*12UL+peer_idx ];
     231           0 :         if( FD_UNLIKELY( peer->txbuild->crds_len ) ) push_dlist_ele_remove( active_set->push_dlist, peer, active_set->peers );
     232             : 
     233           0 :         for( ulong j=i; j<entry->nodes_len-1UL; j++ ) {
     234           0 :           ulong from_idx = b*12UL+(entry->nodes_idx+j+1UL) % 12UL;
     235           0 :           ulong to_idx   = b*12UL+(entry->nodes_idx+j) % 12UL;
     236           0 :           fd_bloom_t * to_bloom = active_set->peers[ to_idx ].bloom;
     237           0 :           active_set->peers[ to_idx ] = active_set->peers[ from_idx ];
     238           0 :           active_set->peers[ from_idx ].bloom = to_bloom;
     239             :           /* If the moved element is in the push_dlist, fix up the
     240             :              dlist links so neighbors point to the new location.
     241             :              idx_replace reads prev/next from old_idx (from_idx, still
     242             :              intact) and patches neighbors + sentinel to reference
     243             :              to_abs instead. */
     244           0 :           if( FD_UNLIKELY( active_set->peers[ to_idx ].txbuild->crds_len ) ) push_dlist_idx_replace( active_set->push_dlist, to_idx, from_idx, active_set->peers );
     245           0 :         }
     246           0 :         entry->nodes_len--;
     247           0 :         if( FD_UNLIKELY( !entry->nodes_len ) ) entry->nodes_idx = 0UL;
     248           0 :         break;
     249           0 :       }
     250           0 :     }
     251           0 :   }
     252           0 : }
     253             : 
     254             : void
     255             : fd_active_set_push( fd_active_set_t *   active_set,
     256             :                     uchar const *       crds_val,
     257             :                     ulong               crds_sz,
     258             :                     uchar const *       origin_pubkey,
     259             :                     ulong               origin_stake,
     260             :                     fd_stem_context_t * stem,
     261             :                     long                now,
     262           0 :                     int                 flush_immediately ) {
     263           0 :   ulong stake_bucket = fd_active_set_stake_bucket( fd_ulong_min( active_set->identity_stake, origin_stake ) );
     264           0 :   fd_active_set_entry_t * entry = active_set->entries[ stake_bucket ];
     265             : 
     266           0 :   int originates_from_me = !memcmp( active_set->identity_pubkey, origin_pubkey, 32UL );
     267             : 
     268           0 :   for( ulong i=0UL; i<entry->nodes_len; i++ ) {
     269           0 :     fd_active_set_peer_t * peer = &active_set->peers[ stake_bucket*12UL+((entry->nodes_idx+i) % 12UL) ];
     270             : 
     271             :     /* If the value originated from us, we should always push it, even
     272             :        if theres a bloom filter hit, since bloom filters can have false
     273             :        positives and we don't want to accidentally not push our own
     274             :        values. */
     275           0 :     if( FD_UNLIKELY( fd_bloom_contains( peer->bloom, origin_pubkey, 32UL ) && !originates_from_me ) ) continue;
     276             : 
     277           0 :     if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( peer->txbuild, crds_sz ) ) ) push_flush( active_set, peer, stem, now );
     278           0 :     if( FD_UNLIKELY( !peer->txbuild->crds_len ) ) {
     279           0 :       peer->timestamp = now;
     280           0 :       push_dlist_ele_push_tail( active_set->push_dlist, peer, active_set->peers );
     281           0 :     }
     282           0 :     fd_gossip_txbuild_append( peer->txbuild, crds_sz, crds_val );
     283           0 :     if( FD_UNLIKELY( flush_immediately ) ) push_flush( active_set, peer, stem, now );
     284           0 :   }
     285           0 : }
     286             : 
     287             : static inline void
     288             : rotate_active_set( fd_active_set_t *   active_set,
     289             :                    fd_stem_context_t * stem,
     290           0 :                    long                now ) {
     291           0 :   ulong num_bloom_filter_items = fd_ulong_max( fd_crds_peer_count( active_set->crds ), 512UL );
     292             : 
     293           0 :   ulong bucket = active_set->rotate_bucket;
     294           0 :   active_set->rotate_bucket = (active_set->rotate_bucket+1UL) % 25UL;
     295           0 :   fd_active_set_entry_t * entry = active_set->entries[ bucket ];
     296             : 
     297             :   /* Sample a new peer BEFORE evicting the oldest.  This prevents the
     298             :      case where we evict a peer back into the sampler and then
     299             :      immediately re-sample it, creating a duplicate. */
     300             : 
     301           0 :   ulong added_ci_idx = fd_gossip_wsample_sample_remove_bucket( active_set->wsample, bucket );
     302           0 :   if( FD_UNLIKELY( added_ci_idx==ULONG_MAX ) ) return;
     303             : 
     304           0 :   ulong replace_idx;
     305           0 :   if( FD_LIKELY( entry->nodes_len==12UL ) ) {
     306           0 :     replace_idx      = entry->nodes_idx;
     307           0 :     entry->nodes_idx = (entry->nodes_idx+1UL) % 12UL;
     308             : 
     309             :     /* Add the replaced peer back to the sampler. */
     310           0 :     ulong old_ci_idx = active_set->peers[ bucket*12UL+replace_idx ].ci_idx;
     311           0 :     fd_gossip_wsample_add_bucket( active_set->wsample, bucket, old_ci_idx );
     312           0 :     push_flush( active_set, &active_set->peers[ bucket*12UL+replace_idx ], stem, now );
     313           0 :   } else {
     314           0 :     replace_idx = (entry->nodes_idx+entry->nodes_len) % 12UL;
     315           0 :   }
     316             : 
     317           0 :   fd_active_set_peer_t * replace = &active_set->peers[ bucket*12UL+replace_idx ];
     318           0 :   replace->ci_idx = added_ci_idx;
     319           0 :   uchar const * new_pubkey = fd_crds_ci_pubkey( active_set->crds, added_ci_idx );
     320             : 
     321           0 :   fd_bloom_initialize( replace->bloom, num_bloom_filter_items );
     322           0 :   fd_bloom_insert( replace->bloom, new_pubkey, 32UL );
     323           0 :   entry->nodes_len = fd_ulong_min( entry->nodes_len+1UL, 12UL );
     324           0 :   fd_gossip_txbuild_init( replace->txbuild, active_set->identity_pubkey, FD_GOSSIP_MESSAGE_PUSH );
     325           0 : }
     326             : 
     327             : 
     328             : void
     329             : fd_active_set_advance( fd_active_set_t *   active_set,
     330             :                        fd_stem_context_t * stem,
     331             :                        long                now,
     332           0 :                        int *               charge_busy ) {
     333           0 :   while( !push_dlist_is_empty( active_set->push_dlist, active_set->peers ) ) {
     334           0 :     fd_active_set_peer_t * head = push_dlist_ele_peek_head( active_set->push_dlist, active_set->peers );
     335           0 :     if( FD_LIKELY( head->timestamp>=now-1L*1000L*1000L ) ) break;
     336             : 
     337           0 :     push_flush( active_set, head, stem, now );
     338           0 :     if( charge_busy ) *charge_busy = 1;
     339           0 :   }
     340             : 
     341           0 :   if( FD_UNLIKELY( now>=active_set->next_rotate_nanos ) ) {
     342           0 :     rotate_active_set( active_set, stem, now );
     343           0 :     active_set->next_rotate_nanos = now+300L*1000L*1000L;
     344           0 :     if( charge_busy ) *charge_busy = 1;
     345           0 :   }
     346           0 : }

Generated by: LCOV version 1.14