LCOV - code coverage report
Current view: top level - flamenco/accdb - fd_accdb_admin_v2_root.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 234 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 6 0.0 %

          Line data    Source code
       1             : #include "fd_accdb_admin_v2_private.h"
       2             : #include "../fd_flamenco_base.h"
       3             : #include "../runtime/fd_runtime_const.h" /* FD_RUNTIME_ACC_SZ_MAX */
       4             : #include "../../vinyl/data/fd_vinyl_data.h"
       5             : 
       6             : /***********************************************************************
       7             : 
       8             :   fd_accdb_admin_v2_root.c contains the account rooting algorithm.
       9             : 
      10             :    This algorithm is designed to amortize vinyl I/O latency by
      11             :    processing accounts in batches.
      12             : 
      13             :    For each batch of accounts, it does the following logic:
      14             : 
      15             :    - ACQUIRE batch request for account updates
      16             :    - ERASE   batch request for account deletions
      17             :    - Spin wait for ACQUIRE completion
      18             :    - Copy back modified accounts
      19             :    - RELEASE batch request for account updates
      20             :    - Spin wait for ACQUIRE, ERASE completions
      21             :    - Free records from funk
      22             : 
      23             : ***********************************************************************/
      24             : 
      25             : /* vinyl_spin_wait waits for completion of a vinyl request and asserts
      26             :    that all requests completed successfully. */
      27             : 
      28             : static void
      29             : vinyl_spin_wait( fd_vinyl_comp_t const * comp,
      30             :                  fd_vinyl_key_t const *  key0,
      31             :                  schar const *           err0,
      32             :                  ulong                   cnt,
      33           0 :                  char const *            req_type_cstr ) {
      34             : 
      35             :   /* FIXME use a load-acquire here, such that later loads are ordered
      36             :            past this load */
      37           0 :   while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
      38           0 :   FD_COMPILER_MFENCE();
      39           0 :   int comp_err = FD_VOLATILE_CONST( comp->err );
      40           0 :   if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
      41           0 :     FD_LOG_CRIT(( "vinyl tile rejected my %s request (%i-%s)",
      42           0 :                   req_type_cstr, comp_err, fd_vinyl_strerror( comp_err ) ));
      43           0 :   }
      44             : 
      45           0 :   for( ulong i=0UL; i<cnt; i++ ) {
      46           0 :     int err = err0[ i ];
      47           0 :     if( FD_UNLIKELY( err!=FD_VINYL_SUCCESS && err!=FD_VINYL_ERR_KEY ) ) {
      48           0 :       FD_BASE58_ENCODE_32_BYTES( key0[i].uc, key_b58 );
      49           0 :       FD_LOG_CRIT(( "vinyl %s request failed for %s (%i-%s)",
      50           0 :                     req_type_cstr, key_b58, err, fd_vinyl_strerror( err ) ));
      51           0 :     }
      52           0 :   }
      53           0 : }
      54             : 
      55             : /* funk_rec_write_lock spins until it gains a write lock for a record,
      56             :    increments the version number, and returns the updated ver_lock
      57             :    value. */
      58             : 
      59             : static ulong
      60             : fd_funk_rec_admin_lock( fd_funk_t const * funk,
      61           0 :                         fd_funk_rec_t *   rec ) {
      62           0 :   ulong            rec_idx = (ulong)( rec - funk->rec_pool->ele );
      63           0 :   ulong volatile * vl      = &funk->rec_lock[ rec_idx ];
      64           0 :   for(;;) {
      65           0 :     ulong const ver_lock = FD_VOLATILE_CONST( *vl );
      66           0 :     ulong const ver      = fd_funk_rec_ver_bits ( ver_lock );
      67           0 :     ulong const lock     = fd_funk_rec_lock_bits( ver_lock );
      68           0 :     if( FD_UNLIKELY( lock ) ) {
      69             :       /* Spin while there are active readers */
      70             :       /* FIXME kill client after spinning for 30 seconds to prevent silent deadlock */
      71           0 :       FD_SPIN_PAUSE();
      72           0 :       continue;
      73           0 :     }
      74           0 :     ulong const new_ver = fd_funk_rec_ver_inc( ver );
      75           0 :     ulong const new_vl  = fd_funk_rec_ver_lock( new_ver, FD_FUNK_REC_LOCK_MASK );
      76           0 :     if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, ver_lock, new_vl )!=ver_lock ) ) {
      77           0 :       FD_SPIN_PAUSE();
      78           0 :       continue;
      79           0 :     }
      80           0 :     return new_vl;
      81           0 :   }
      82           0 : }
      83             : 
      84             : static void
      85             : fd_funk_rec_admin_unlock( fd_funk_t const * funk,
      86             :                           fd_funk_rec_t *   rec,
      87           0 :                           ulong             ver_lock ) {
      88           0 :   ulong            rec_idx = (ulong)( rec - funk->rec_pool->ele );
      89           0 :   ulong volatile * vl      = &funk->rec_lock[ rec_idx ];
      90           0 :   FD_VOLATILE( *vl ) = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( ver_lock ), 0UL );
      91           0 : }
      92             : 
      93             : static void
      94             : funk_free_rec( fd_funk_t *     funk,
      95           0 :                fd_funk_rec_t * rec ) {
      96             :   /* Acquire admin lock (kick out readers)
      97             : 
      98             :      Note: At this point, well-behaving external readers will abandon a
      99             :      read-lock attempt if they observe this active write lock.  (An
     100             :      admin lock always implies the record is about to die) */
     101             : 
     102           0 :   FD_COMPILER_MFENCE();
     103           0 :   ulong ver_lock = fd_funk_rec_admin_lock( funk, rec );
     104             : 
     105             :   /* Free record */
     106             : 
     107           0 :   memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
     108           0 :   FD_COMPILER_MFENCE();
     109           0 :   rec->map_next = FD_FUNK_REC_IDX_NULL;
     110           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
     111           0 :   fd_funk_rec_admin_unlock( funk, rec, ver_lock );
     112           0 :   fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
     113           0 : }
     114             : 
     115             : /* funk_gc_chain optimistically deletes all but the newest rooted
     116             :    revisions of rec.  This possibly deletes 'rec'.  Returns rec if rec
     117             :    is the only known rooted revision, otherwise returns NULL (if rec was
     118             :    deleted).  Note that due to edge cases, revisions that are not in the
     119             :    oldest tracked slot, may not reliably get cleaned up.  (The oldest
     120             :    tracked slot always gets cleaned up, though.) */
     121             : 
     122             : static fd_funk_rec_t *
     123             : funk_gc_chain( fd_accdb_admin_v2_t * const admin,
     124           0 :                fd_funk_rec_t *       const rec ) {
     125             : 
     126           0 :   fd_accdb_lineage_t * lineage   = admin->root_lineage;
     127           0 :   fd_funk_t *          funk      = admin->v1->funk;
     128           0 :   fd_funk_rec_t *      rec_pool  = funk->rec_pool->ele;
     129           0 :   ulong                rec_max   = funk->rec_pool->ele_max;
     130           0 :   ulong                seed      = funk->rec_map->map->seed;
     131           0 :   ulong                chain_cnt = funk->rec_map->map->chain_cnt;
     132           0 :   ulong                root_slot = lineage->fork[0].ul[0];
     133             : 
     134           0 :   ulong hash      = fd_funk_rec_map_key_hash( &rec->pair, seed );
     135           0 :   ulong chain_idx = (hash & (chain_cnt-1UL) );
     136             : 
     137             :   /* Lock rec_map chain */
     138             : 
     139           0 :   int lock_err = fd_funk_rec_map_iter_lock( funk->rec_map, &chain_idx, 1UL, FD_MAP_FLAG_BLOCKING );
     140           0 :   if( FD_UNLIKELY( lock_err!=FD_MAP_SUCCESS ) ) {
     141           0 :     FD_LOG_CRIT(( "fd_funk_rec_map_iter_lock failed (%i-%s)", lock_err, fd_map_strerror( lock_err ) ));
     142           0 :   }
     143             : 
     144           0 :   fd_funk_rec_map_shmem_private_chain_t * chain =
     145           0 :       fd_funk_rec_map_shmem_private_chain( funk->rec_map->map, 0UL ) + chain_idx;
     146           0 :   ulong ver =
     147           0 :       fd_funk_rec_map_private_vcnt_ver( FD_VOLATILE_CONST( chain->ver_cnt ) );
     148           0 :   FD_CRIT( ver&1UL, "chain is not locked" );
     149             : 
     150             :   /* Walk map chain */
     151             : 
     152           0 :   fd_funk_rec_t * found_rec = NULL;
     153           0 :   uint *          pnext     = &chain->head_cidx;
     154           0 :   uint            cur       = *pnext;
     155           0 :   ulong           chain_len = 0UL;
     156           0 :   ulong           iter      = 0UL;
     157           0 :   while( cur!=FD_FUNK_REC_IDX_NULL ) {
     158           0 :     if( FD_UNLIKELY( iter++ > rec_max ) ) FD_LOG_CRIT(( "cycle detected in rec_map chain %lu", chain_idx ));
     159             : 
     160             :     /* Is this node garbage? */
     161             : 
     162           0 :     fd_funk_rec_t * node = &funk->rec_pool->ele[ cur ];
     163           0 :     if( FD_UNLIKELY( cur==node->map_next ) ) FD_LOG_CRIT(( "accdb corruption detected: cycle in rec_map chain %lu", chain_idx ));
     164           0 :     cur = node->map_next;
     165           0 :     if( !fd_funk_rec_key_eq( rec->pair.key, node->pair.key ) ) goto retain;
     166           0 :     if( node->pair.xid->ul[0]>root_slot ) goto retain;
     167           0 :     if( !found_rec ) {
     168           0 :       found_rec = node;
     169           0 :       goto retain;
     170           0 :     }
     171             : 
     172             :     /* No longer need this node */
     173             : 
     174           0 :     if( node->pair.xid->ul[0] > rec->pair.xid->ul[0] ) {
     175             :       /* If this node is newer than the to-be-deleted slot, need to
     176             :          remove it from the transaction's record list. */
     177           0 :       uint neigh_prev = node->prev_idx;
     178           0 :       uint neigh_next = node->next_idx;
     179           0 :       if( neigh_prev==FD_FUNK_REC_IDX_NULL ||
     180           0 :           neigh_next==FD_FUNK_REC_IDX_NULL ) {
     181             :         /* Node is first or last of transaction -- too bothersome to
     182             :            remove it from the transaction's record list */
     183           0 :         goto retain;
     184           0 :       }
     185           0 :       rec_pool[ neigh_next ].prev_idx = neigh_prev;
     186           0 :       rec_pool[ neigh_prev ].next_idx = neigh_next;
     187           0 :     }
     188             : 
     189             :     /* Destroy this node */
     190             : 
     191           0 :     funk_free_rec( funk, node );
     192           0 :     *pnext = cur;
     193           0 :     continue;
     194             : 
     195           0 :   retain:
     196           0 :     pnext = &node->map_next;
     197           0 :     chain_len++;
     198           0 :   }
     199             : 
     200             :   /* Unlock rec_map chain */
     201             : 
     202           0 :   FD_COMPILER_MFENCE();
     203           0 :   FD_VOLATILE( chain->ver_cnt ) =
     204           0 :       fd_funk_rec_map_private_vcnt( ver+1UL, chain_len );
     205           0 :   FD_COMPILER_MFENCE();
     206           0 :   return found_rec==rec ? found_rec : NULL;
     207           0 : }
     208             : 
     209             : /* Main algorithm */
     210             : 
     211             : fd_funk_rec_t *
     212             : fd_accdb_v2_root_batch( fd_accdb_admin_v2_t * admin,
     213           0 :                         fd_funk_rec_t *       rec0 ) {
     214           0 :   long t_start = fd_tickcount();
     215             : 
     216           0 :   fd_funk_t *           funk      = admin->v1->funk;        /* unrooted DB */
     217           0 :   fd_wksp_t *           funk_wksp = funk->wksp;             /* shm workspace containing unrooted accounts */
     218           0 :   fd_funk_rec_t *       rec_pool  = funk->rec_pool->ele;    /* funk rec arena */
     219           0 :   fd_vinyl_rq_t *       rq        = admin->vinyl_rq;        /* "request queue "*/
     220           0 :   fd_vinyl_req_pool_t * req_pool  = admin->vinyl_req_pool;  /* "request pool" */
     221           0 :   fd_wksp_t *           req_wksp  = admin->vinyl_req_wksp;  /* shm workspace containing request buffer */
     222           0 :   fd_wksp_t *           data_wksp = admin->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
     223           0 :   ulong                 link_id   = admin->vinyl_link_id;   /* vinyl client ID */
     224             : 
     225             :   /* Collect funk request batch */
     226             : 
     227           0 :   fd_funk_rec_t * recs[ FD_ACCDB_ROOT_BATCH_MAX ];
     228           0 :   ulong           rec_cnt;
     229             : 
     230           0 :   fd_funk_rec_t * next = rec0;
     231           0 :   for( rec_cnt=0UL; next && rec_cnt<FD_ACCDB_ROOT_BATCH_MAX; ) {
     232           0 :     fd_funk_rec_t * cur = next;
     233           0 :     if( fd_funk_rec_idx_is_null( cur->next_idx ) ) {
     234           0 :       next = NULL;
     235           0 :     } else {
     236           0 :       next = &rec_pool[ cur->next_idx ];
     237           0 :     }
     238           0 :     cur->prev_idx = FD_FUNK_REC_IDX_NULL;
     239           0 :     cur->next_idx = FD_FUNK_REC_IDX_NULL;
     240             : 
     241           0 :     if( funk_gc_chain( admin, cur ) ) {
     242           0 :       recs[ rec_cnt++ ] = cur;
     243           0 :     }
     244           0 :   }
     245             : 
     246             :   /* Partition batch into ACQUIRE (updates) and ERASE (deletions) */
     247             : 
     248           0 :   ulong acq_cnt = 0UL;
     249           0 :   ulong del_cnt;
     250           0 :   for( ulong i=0UL; i<rec_cnt; i++ ) {
     251           0 :     fd_account_meta_t const * meta = fd_funk_val( recs[ i ], funk_wksp );
     252           0 :     FD_CRIT( meta && recs[ i ]->val_sz>=sizeof(fd_account_meta_t), "corrupt funk_rec" );
     253           0 :     if( meta->lamports ) {
     254           0 :       fd_funk_rec_t * tmp = recs[ i ];
     255           0 :       recs[ i ]       = recs[ acq_cnt ];
     256           0 :       recs[ acq_cnt ] = tmp;
     257           0 :       acq_cnt++;
     258           0 :     }
     259           0 :   }
     260           0 :   del_cnt = rec_cnt - acq_cnt;
     261             : 
     262             :   /* Create ACQUIRE and ERASE batch requests */
     263             : 
     264           0 :   ulong            del_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ERASE */
     265           0 :   ulong            acq_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ACQUIRE */
     266           0 :   fd_vinyl_key_t * acq_key0  = fd_vinyl_req_batch_key( req_pool, acq_batch );
     267           0 :   fd_vinyl_key_t * del_key0  = fd_vinyl_req_batch_key( req_pool, del_batch );
     268             : 
     269           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) {
     270           0 :     fd_vinyl_key_init( &acq_key0[ i ], recs[ i         ]->pair.key, 32UL );
     271           0 :   }
     272           0 :   for( ulong i=0UL; i<del_cnt; i++ ) {
     273           0 :     fd_vinyl_key_init( &del_key0[ i ], recs[ acq_cnt+i ]->pair.key, 32UL );
     274           0 :   }
     275             : 
     276             :   /* Send off ACQUIRE and ERASE requests */
     277             : 
     278           0 :   fd_vinyl_comp_t * acq_comp       = fd_vinyl_req_batch_comp     ( req_pool, acq_batch );
     279           0 :   fd_vinyl_comp_t * del_comp       = fd_vinyl_req_batch_comp     ( req_pool, del_batch );
     280           0 :   schar *           acq_err0       = fd_vinyl_req_batch_err      ( req_pool, acq_batch );
     281           0 :   schar *           del_err0       = fd_vinyl_req_batch_err      ( req_pool, del_batch );
     282           0 :   ulong *           acq_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, acq_batch );
     283             : 
     284           0 :   memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
     285           0 :   memset( del_comp, 0, sizeof(fd_vinyl_comp_t) );
     286           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
     287           0 :   for( ulong i=0UL; i<del_cnt; i++ ) del_err0[ i ] = 0;
     288           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) {
     289           0 :     fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
     290             : 
     291           0 :     ulong data_sz = src_meta->dlen;
     292           0 :     FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
     293             : 
     294           0 :     ulong val_sz = sizeof(fd_account_meta_t) + data_sz;
     295           0 :     acq_val_gaddr0[ i ]      = val_sz;
     296           0 :     admin->base.root_tot_sz += val_sz;
     297           0 :   }
     298             : 
     299           0 :   fd_vinyl_req_send_batch(
     300           0 :       rq, req_pool, req_wksp,
     301           0 :       admin->vinyl_req_id++, link_id,
     302           0 :       FD_VINYL_REQ_TYPE_ACQUIRE,
     303           0 :       FD_VINYL_REQ_FLAG_MODIFY |
     304           0 :       FD_VINYL_REQ_FLAG_IGNORE |
     305           0 :       FD_VINYL_REQ_FLAG_CREATE,
     306           0 :       acq_batch, acq_cnt
     307           0 :   );
     308           0 :   fd_vinyl_req_send_batch(
     309           0 :       rq, req_pool, req_wksp,
     310           0 :       admin->vinyl_req_id++, link_id,
     311           0 :       FD_VINYL_REQ_TYPE_ERASE,
     312           0 :       0UL,
     313           0 :       del_batch, del_cnt
     314           0 :   );
     315             : 
     316             :   /* Spin for ACQUIRE completion */
     317             : 
     318           0 :   vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "ACQUIRE" );
     319           0 :   long t_acquire = fd_tickcount();
     320             : 
     321             :   /* Copy back modified accounts */
     322             : 
     323           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) {
     324           0 :     fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
     325             : 
     326           0 :     ulong data_sz = src_meta->dlen;
     327           0 :     ulong val_sz  = sizeof(fd_account_meta_t) + data_sz;
     328           0 :     FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
     329             : 
     330           0 :     fd_account_meta_t * dst_meta = fd_wksp_laddr_fast( data_wksp, acq_val_gaddr0[ i ] );
     331           0 :     fd_vinyl_info_t *   val_info = fd_vinyl_data_info( dst_meta );
     332             : 
     333           0 :     fd_memcpy( dst_meta, src_meta, val_sz );
     334           0 :     val_info->val_sz = (uint)val_sz;
     335           0 :   }
     336             : 
     337             :   /* Send off RELEASE batch request (reuse acq_batch) */
     338             : 
     339           0 :   memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
     340           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
     341           0 :   fd_vinyl_req_send_batch(
     342           0 :       rq, req_pool, req_wksp,
     343           0 :       admin->vinyl_req_id++, link_id,
     344           0 :       FD_VINYL_REQ_TYPE_RELEASE,
     345           0 :       FD_VINYL_REQ_FLAG_MODIFY,
     346           0 :       acq_batch, acq_cnt
     347           0 :   );
     348           0 :   long t_copy = fd_tickcount();
     349             : 
     350             :   /* Spin for ERASE, RELEASE completions */
     351             : 
     352           0 :   vinyl_spin_wait( del_comp, del_key0, del_err0, del_cnt, "ERASE" );
     353           0 :   fd_vinyl_req_pool_release( req_pool, del_batch );
     354             : 
     355           0 :   vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "RELEASE" );
     356           0 :   fd_vinyl_req_pool_release( req_pool, acq_batch );
     357           0 :   long t_release = fd_tickcount();
     358             : 
     359             :   /* Remove funk records */
     360             : 
     361           0 :   for( ulong i=0UL; i<rec_cnt; i++ ) {
     362           0 :     fd_funk_xid_key_pair_t pair = recs[ i ]->pair;
     363           0 :     fd_funk_rec_query_t query[1];
     364           0 :     int rm_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
     365           0 :     if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
     366           0 :     funk_free_rec( funk, recs[ i ] );
     367           0 :   }
     368           0 :   long t_gc = fd_tickcount();
     369             : 
     370             :   /* Update metrics */
     371             : 
     372           0 :   admin->base.root_cnt    += (uint)acq_cnt;
     373           0 :   admin->base.reclaim_cnt += (uint)del_cnt;
     374           0 :   admin->base.dt_vinyl    += ( t_acquire - t_start ) + ( t_release - t_copy );
     375           0 :   admin->base.dt_copy     += ( t_copy - t_acquire );
     376           0 :   admin->base.dt_gc       += ( t_gc - t_release );
     377             : 
     378           0 :   return next;
     379           0 : }

Generated by: LCOV version 1.14