LCOV - code coverage report
Current view: top level - discof/repair - fd_policy.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 257 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 16 0.0 %

          Line data    Source code
       1             : #include "fd_policy.h"
       2             : #include "../../disco/metrics/fd_metrics.h"
       3             : 
       4             : #define NONCE_NULL        (UINT_MAX)
       5           0 : #define DEFER_REPAIR_MS   (200UL)
       6           0 : #define TARGET_TICK_PER_SLOT (64.0)
       7           0 : #define MS_PER_TICK          (400.0 / TARGET_TICK_PER_SLOT)
       8             : 
       9             : void *
      10           0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed, fd_rnonce_ss_t const * rnonce_ss ) {
      11             : 
      12           0 :   if( FD_UNLIKELY( !shmem ) ) {
      13           0 :     FD_LOG_WARNING(( "NULL mem" ));
      14           0 :     return NULL;
      15           0 :   }
      16             : 
      17           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
      18           0 :     FD_LOG_WARNING(( "misaligned mem" ));
      19           0 :     return NULL;
      20           0 :   }
      21             : 
      22           0 :   ulong footprint = fd_policy_footprint( dedup_max, peer_max );
      23           0 :   fd_memset( shmem, 0, footprint );
      24             : 
      25           0 :   ulong peer_chain_cnt = fd_policy_peer_map_chain_cnt_est( peer_max );
      26           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      27           0 :   fd_policy_t * policy     = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(),            sizeof(fd_policy_t)                            );
      28           0 :   void *        dedup_map  = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(),  fd_policy_dedup_map_footprint ( dedup_max )    );
      29           0 :   void *        dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max )    );
      30           0 :   void *        dedup_lru  = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(),  fd_policy_dedup_lru_footprint()                );
      31           0 :   void *        peers      = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(),   fd_policy_peer_map_footprint( peer_chain_cnt ) );
      32           0 :   void *        peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_pool_align(),  fd_policy_peer_pool_footprint( peer_max )      );
      33           0 :   void *        peers_fast = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_dlist_align(), fd_policy_peer_dlist_footprint()               );
      34           0 :   void *        peers_slow = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_dlist_align(), fd_policy_peer_dlist_footprint()               );
      35           0 :   FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
      36             : 
      37           0 :   policy->dedup.map     = fd_policy_dedup_map_new ( dedup_map,  dedup_max,      seed );
      38           0 :   policy->dedup.pool    = fd_policy_dedup_pool_new( dedup_pool, dedup_max            );
      39           0 :   policy->dedup.lru     = fd_policy_dedup_lru_new ( dedup_lru                        );
      40           0 :   policy->peers.map     = fd_policy_peer_map_new  ( peers,      peer_chain_cnt, seed );
      41           0 :   policy->peers.pool    = fd_policy_peer_pool_new ( peers_pool, peer_max             );
      42           0 :   policy->peers.fast    = fd_policy_peer_dlist_new( peers_fast                       );
      43           0 :   policy->peers.slow    = fd_policy_peer_dlist_new( peers_slow                       );
      44           0 :   policy->turbine_slot0 = ULONG_MAX;
      45           0 :   policy->rnonce_ss[0]  = *rnonce_ss;
      46             : 
      47           0 :   return shmem;
      48           0 : }
      49             : 
      50             : fd_policy_t *
      51           0 : fd_policy_join( void * shpolicy ) {
      52           0 :   fd_policy_t * policy = (fd_policy_t *)shpolicy;
      53             : 
      54           0 :   if( FD_UNLIKELY( !policy ) ) {
      55           0 :     FD_LOG_WARNING(( "NULL policy" ));
      56           0 :     return NULL;
      57           0 :   }
      58             : 
      59           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
      60           0 :     FD_LOG_WARNING(( "misaligned policy" ));
      61           0 :     return NULL;
      62           0 :   }
      63             : 
      64           0 :   fd_wksp_t * wksp = fd_wksp_containing( policy );
      65           0 :   if( FD_UNLIKELY( !wksp ) ) {
      66           0 :     FD_LOG_WARNING(( "policy must be part of a workspace" ));
      67           0 :     return NULL;
      68           0 :   }
      69             : 
      70           0 :   policy->dedup.map  = fd_policy_dedup_map_join ( policy->dedup.map  );
      71           0 :   policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool );
      72           0 :   policy->dedup.lru  = fd_policy_dedup_lru_join ( policy->dedup.lru  );
      73           0 :   policy->peers.map  = fd_policy_peer_map_join  ( policy->peers.map  );
      74           0 :   policy->peers.pool = fd_policy_peer_pool_join ( policy->peers.pool );
      75           0 :   policy->peers.fast = fd_policy_peer_dlist_join( policy->peers.fast );
      76           0 :   policy->peers.slow = fd_policy_peer_dlist_join( policy->peers.slow );
      77             : 
      78           0 :   policy->peers.select.iter  = fd_policy_peer_dlist_iter_fwd_init( policy->peers.slow, policy->peers.pool );
      79           0 :   policy->peers.select.stage = 0;
      80             : 
      81           0 :   return policy;
      82           0 : }
      83             : 
      84             : void *
      85           0 : fd_policy_leave( fd_policy_t const * policy ) {
      86             : 
      87           0 :   if( FD_UNLIKELY( !policy ) ) {
      88           0 :     FD_LOG_WARNING(( "NULL policy" ));
      89           0 :     return NULL;
      90           0 :   }
      91             : 
      92           0 :   return (void *)policy;
      93           0 : }
      94             : 
      95             : void *
      96           0 : fd_policy_delete( void * policy ) {
      97             : 
      98           0 :   if( FD_UNLIKELY( !policy ) ) {
      99           0 :     FD_LOG_WARNING(( "NULL policy" ));
     100           0 :     return NULL;
     101           0 :   }
     102             : 
     103           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
     104           0 :     FD_LOG_WARNING(( "misaligned policy" ));
     105           0 :     return NULL;
     106           0 :   }
     107             : 
     108           0 :   return policy;
     109           0 : }
     110             : 
     111             : /* dedup_evict evicts the first element returned by the map iterator. */
     112             : 
     113             : static void
     114           0 : dedup_evict( fd_policy_t * policy ) {
     115           0 :   fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
     116           0 :   fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
     117           0 :   fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
     118           0 : }
     119             : 
     120             : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
     121             : static int
     122           0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
     123           0 :   fd_policy_dedup_t *     dedup = &policy->dedup;
     124           0 :   fd_policy_dedup_ele_t * ele   = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
     125           0 :   if( FD_UNLIKELY( !ele ) ) {
     126           0 :     if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
     127           0 :     ele         = fd_policy_dedup_pool_ele_acquire( dedup->pool );
     128           0 :     ele->key    = key;
     129           0 :     ele->req_ts = 0;
     130           0 :     fd_policy_dedup_map_ele_insert   ( dedup->map, ele, dedup->pool );
     131           0 :     fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
     132           0 :   }
     133           0 :   if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) {
     134           0 :     fd_policy_dedup_lru_ele_remove( dedup->lru, ele, dedup->pool );
     135           0 :     fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
     136           0 :     return 1;
     137           0 :   }
     138           0 :   ele->req_ts = now;
     139           0 :   return 0;
     140           0 : }
     141             : 
     142           0 : static ulong ts_ms( long wallclock ) {
     143           0 :   return (ulong)wallclock / (ulong)1e6;
     144           0 : }
     145             : 
     146             : static int
     147           0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
     148           0 :   if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
     149             :   /* Essentially is checking if current duration of block ( from the
     150             :      first shred received until now ) is greater than the highest tick
     151             :      received + 200ms. */
     152           0 :   double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
     153           0 :   double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
     154             : 
     155           0 :   if( current_duration >= tick_plus_buffer ){
     156           0 :     FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
     157           0 :     return 1;
     158           0 :   }
     159           0 :   return 0;
     160           0 : }
     161             : 
     162             : fd_pubkey_t const *
     163           0 : fd_policy_peer_select( fd_policy_t * policy ) {
     164           0 :   fd_policy_peer_dlist_t * best_dlist  = policy->peers.fast;
     165           0 :   fd_policy_peer_dlist_t * worst_dlist = policy->peers.slow;
     166           0 :   fd_policy_peer_t       * pool        = policy->peers.pool;
     167             : 
     168           0 :   if( FD_UNLIKELY( fd_policy_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
     169             : 
     170           0 :   fd_policy_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
     171             : 
     172           0 :   while( FD_UNLIKELY( fd_policy_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
     173           0 :     policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint));
     174           0 :     dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
     175           0 :     policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_init( dlist, pool );
     176           0 :   }
     177           0 :   fd_policy_peer_t * select = fd_policy_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool );
     178           0 :   policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
     179           0 :   return &select->key;
     180           0 : }
     181             : 
     182             : fd_repair_msg_t const *
     183           0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot, int * charge_busy ) {
     184           0 :   fd_forest_blk_t *      pool     = fd_forest_pool( forest );
     185           0 :   fd_forest_subtlist_t * subtlist = fd_forest_subtlist( forest );
     186           0 :   *charge_busy = 0;
     187             : 
     188           0 :   if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
     189           0 :   if( FD_UNLIKELY( fd_policy_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
     190             : 
     191           0 :   fd_repair_msg_t * out = NULL;
     192           0 :   ulong now_ms = ts_ms( now );
     193             : 
     194           0 :   for( fd_forest_subtlist_iter_t iter = fd_forest_subtlist_iter_fwd_init( subtlist, pool );
     195           0 :                                        !fd_forest_subtlist_iter_done    ( iter, subtlist, pool );
     196           0 :                                  iter = fd_forest_subtlist_iter_fwd_next( iter, subtlist, pool ) ) {
     197           0 :     *charge_busy = 1;
     198           0 :     fd_forest_blk_t * orphan = fd_forest_subtlist_iter_ele( iter, subtlist, pool );
     199           0 :     ulong key                = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
     200           0 :     if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
     201           0 :       uint nonce = fd_rnonce_ss_compute( policy->rnonce_ss, 0, orphan->slot, 0U, now );
     202           0 :       out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, nonce, orphan->slot );
     203           0 :       return out;
     204           0 :     }
     205           0 :   }
     206             : 
     207             :   /* Select a slot to operate on 🔪. Advance either the orphan iter or
     208             :      regular iter. */
     209           0 :   fd_forest_iter_t * iter = NULL;
     210           0 :   if( FD_UNLIKELY( fd_forest_reqslist_is_empty( fd_forest_reqslist( forest ), fd_forest_reqspool( forest ) ) ) ) {
     211             :     /* If the main tree has nothing to iterate at the moment, we can
     212             :        request down the ORPHAN trees on slots we know about. */
     213           0 :     iter = &forest->orphiter;
     214           0 :   } else {
     215           0 :     iter = &forest->iter;
     216           0 :   }
     217             : 
     218           0 :   fd_forest_iter_next( iter, forest );
     219           0 :   if( FD_UNLIKELY( fd_forest_iter_done( iter, forest ) ) ) {
     220             :     // This happens when we have already requested all the shreds we know about.
     221           0 :     return NULL;
     222           0 :   }
     223             : 
     224           0 :   fd_forest_blk_t * ele = fd_forest_pool_ele( pool, iter->ele_idx );
     225           0 :   if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
     226             :     /* When we are at the head of the turbine, we should give turbine the
     227             :        chance to complete the shreds.  Agave waits 200ms from the
     228             :        estimated "correct time" of the highest shred received to repair.
     229             :        i.e. if we've received the first 200 shreds, the 200th has a tick
     230             :        of x. Translate that to millis, and we should wait to request shred
     231             :        201 until x + 200ms.  If we have a hole, i.e. first 200 shreds
     232             :        receive except shred 100, and the 101th shred has a tick of y, we
     233             :        should wait until y + 200ms to request shred 100.
     234             : 
     235             :        Here we did not pass the timeout threshold, so we are not ready
     236             :        to repair this slot yet.  But it's possible we have another fork
     237             :        that we need to repair... so we just should skip to the next SLOT
     238             :        in the main tree iterator.  The likelihood that this ele is the
     239             :        head of turbine is high, which means that the shred_idx of the
     240             :        iterf is likely to be UINT_MAX, which means calling
     241             :        fd_forest_iter_next will advance the iterf to the next slot. */
     242           0 :     iter->shred_idx = UINT_MAX;
     243             :     /* TODO: Heinous... but the easiest way to ensure this slot gets
     244             :        added back to the requests deque is if we set the shred_idx to
     245             :        UINT_MAX, but maybe there should be an explicit API for it. */
     246             : 
     247           0 :     return NULL;
     248           0 :   }
     249             : 
     250           0 :   *charge_busy = 1;
     251             : 
     252           0 :   if( FD_UNLIKELY( iter->shred_idx == UINT_MAX ) ) {
     253           0 :     if( FD_UNLIKELY( ele->slot < highest_known_slot ) ) {
     254             :       // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
     255           0 :       uint nonce = fd_rnonce_ss_compute( policy->rnonce_ss, 0, ele->slot, 0U, now );
     256           0 :       out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, nonce, ele->slot, 0 );
     257           0 :     }
     258           0 :   } else {
     259           0 :     uint nonce = fd_rnonce_ss_compute( policy->rnonce_ss, 1, ele->slot, iter->shred_idx, now );
     260           0 :     out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, nonce, ele->slot, iter->shred_idx );
     261           0 :     if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
     262           0 :   }
     263           0 :   return out;
     264           0 : }
     265             : 
     266             : fd_policy_peer_t const *
     267           0 : fd_policy_peer_upsert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
     268           0 :   fd_policy_peer_map_t * peer_map = policy->peers.map;
     269           0 :   fd_policy_peer_t * pool = policy->peers.pool;
     270           0 :   fd_policy_peer_t * peer = fd_policy_peer_map_ele_query( peer_map, key, NULL, pool );
     271           0 :   if( FD_UNLIKELY( !peer && fd_policy_peer_pool_free( pool ) ) ) {
     272           0 :     peer = fd_policy_peer_pool_ele_acquire( pool );
     273           0 :     peer->key  = *key;
     274           0 :     peer->ip4  = addr->addr;
     275           0 :     peer->port = addr->port;
     276           0 :     peer->req_cnt       = 0;
     277           0 :     peer->res_cnt       = 0;
     278           0 :     peer->first_req_ts  = 0;
     279           0 :     peer->last_req_ts   = 0;
     280           0 :     peer->first_resp_ts = 0;
     281           0 :     peer->last_resp_ts  = 0;
     282           0 :     peer->total_lat     = 0;
     283           0 :     peer->stake         = 0;
     284           0 :     peer->ping          = 0;
     285             : 
     286           0 :     fd_policy_peer_map_ele_insert( peer_map, peer, pool );
     287           0 :     fd_policy_peer_dlist_ele_push_tail( policy->peers.slow, peer, pool );
     288           0 :     return peer;
     289           0 :   }
     290           0 :   if( FD_LIKELY( peer ) ) {
     291           0 :     peer->ip4  = addr->addr;
     292           0 :     peer->port = addr->port;
     293           0 :   }
     294           0 :   return NULL;
     295           0 : }
     296             : 
     297             : fd_policy_peer_t *
     298           0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
     299           0 :   if( FD_UNLIKELY( memcmp( key->key, null_pubkey.key, 32UL ) == 0 ) ) {
     300           0 :     FD_LOG_WARNING(( "Repair policy peer with null pubkey." ));
     301           0 :     return NULL;
     302           0 :   };
     303           0 :   fd_policy_peer_t * pool = policy->peers.pool;
     304           0 :   return fd_policy_peer_map_ele_query( policy->peers.map, key, NULL, pool );
     305           0 : }
     306             : 
     307             : int
     308           0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
     309           0 :   fd_policy_peer_t * pool = policy->peers.pool;
     310           0 :   fd_policy_peer_t * peer = fd_policy_peer_map_ele_query( policy->peers.map, key, NULL, pool );
     311           0 :   if( FD_UNLIKELY( !peer ) ) return 0;
     312             : 
     313           0 :   if( FD_UNLIKELY( policy->peers.select.iter == fd_policy_peer_pool_idx( pool, peer ) ) ) {
     314             :     /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
     315           0 :     fd_policy_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
     316           0 :     policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
     317           0 :   }
     318             : 
     319           0 :   fd_policy_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
     320           0 :   fd_policy_peer_dlist_ele_remove( bucket, peer, pool );
     321           0 :   fd_policy_peer_map_ele_remove  ( policy->peers.map, key, NULL, pool );
     322           0 :   fd_policy_peer_pool_ele_release( pool,   peer );
     323           0 :   return 1;
     324           0 : }
     325             : 
     326             : void
     327           0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
     328           0 :   fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
     329           0 :   if( FD_LIKELY( active ) ) {
     330           0 :     active->req_cnt++;
     331           0 :     active->last_req_ts = fd_tickcount();
     332           0 :     if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
     333           0 :   }
     334           0 : }
     335             : 
     336             : void
     337           0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) {
     338           0 :   fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
     339           0 :   if( FD_LIKELY( peer ) ) {
     340           0 :     long now = fd_tickcount();
     341           0 :     fd_policy_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
     342           0 :     peer->res_cnt++;
     343           0 :     if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
     344           0 :     peer->last_resp_ts = now;
     345           0 :     peer->total_lat   += rtt;
     346           0 :     fd_policy_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt  );
     347             : 
     348           0 :     if( prev_bucket != new_bucket ) {
     349           0 :       fd_policy_peer_dlist_ele_remove   ( prev_bucket, peer, policy->peers.pool );
     350           0 :       fd_policy_peer_dlist_ele_push_tail( new_bucket,  peer, policy->peers.pool );
     351           0 :     }
     352           0 :   }
     353           0 : }
     354             : 
     355             : void
     356           0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
     357           0 :   policy->turbine_slot0 = slot;
     358           0 : }
     359             : 

Generated by: LCOV version 1.14