LCOV - code coverage report
Current view: top level - discof/restore - fd_snaplv_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 461 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 21 0.0 %

          Line data    Source code
       1             : #include "fd_snaplv_tile_private.h"
       2             : #include "../../disco/topo/fd_topo.h"
       3             : #include "../../disco/metrics/fd_metrics.h"
       4             : #include "../../ballet/lthash/fd_lthash.h"
       5             : #include "../../util/pod/fd_pod.h"
       6             : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
       7             : #include "generated/fd_snaplv_tile_seccomp.h"
       8             : 
       9             : #include "utils/fd_ssctrl.h"
      10             : #include "utils/fd_vinyl_admin.h"
      11             : 
      12             : #define NAME "snaplv"
      13             : 
      14           0 : #define IN_KIND_SNAPWM (0)
      15           0 : #define IN_KIND_SNAPLH (1)
      16             : #define MAX_IN_LINKS   (16UL) /* at least 1 snapwm and FD_SNAPSHOT_MAX_SNAPLH_TILES */
      17             : 
      18             : #define OUT_LINK_CNT (3UL)
      19           0 : #define OUT_LINK_LH  (0)
      20           0 : #define OUT_LINK_CT  (1)
      21             : #define OUT_LINK_WR  (2)
      22             : 
      23             : struct out_link {
      24             :   ulong       idx;
      25             :   fd_wksp_t * mem;
      26             :   ulong       chunk0;
      27             :   ulong       wmark;
      28             :   ulong       chunk;
      29             : };
      30             : typedef struct out_link out_link_t;
      31             : 
      32             : struct fd_snaplv_tile {
      33             :   uint                state;
      34             :   int                 full;
      35             : 
      36             :   ulong               num_hash_tiles;
      37             :   ulong               num_write_tiles;
      38             : 
      39             :   uchar               in_kind[MAX_IN_LINKS];
      40             :   ulong               adder_in_offset;
      41             : 
      42             :   out_link_t          out_link[OUT_LINK_CNT];
      43             : 
      44             :   long running_capitalization;
      45             :   long dup_capitalization;
      46             :   ulong manifest_capitalization;
      47             : 
      48             :   struct {
      49             :     ulong             bstream_seq_last;
      50             :     struct {
      51             :       int                     active[FD_SNAPLV_DUP_PENDING_CNT_MAX];
      52             :       ulong                   seq   [FD_SNAPLV_DUP_PENDING_CNT_MAX];
      53             :       fd_vinyl_bstream_phdr_t phdr  [FD_SNAPLV_DUP_PENDING_CNT_MAX];
      54             :     } pending;
      55             :     ulong             pending_cnt;
      56             :     fd_vinyl_admin_t *  admin;
      57             :   } vinyl;
      58             : 
      59             :   struct {
      60             :     fd_lthash_value_t expected_lthash;
      61             :     fd_lthash_value_t calculated_lthash;
      62             :     ulong             received_lthashes;
      63             :     ulong             ack_sig;
      64             :     int               awaiting_results;
      65             :     int               hash_check_done;
      66             :   } hash_accum;
      67             : 
      68             :   struct {
      69             :     ulong exp_sig;
      70             :     ulong ack_cnt;
      71             :     int   wait;
      72             :   } fail;
      73             : 
      74             :   struct {
      75             :     fd_lthash_value_t full_lthash;
      76             :     long              capitalization;
      77             :   } recovery;
      78             : 
      79             :   struct {
      80             :     struct {
      81             :       ulong           duplicate_accounts_hashed;
      82             :     } full;
      83             : 
      84             :     struct {
      85             :       ulong           duplicate_accounts_hashed;
      86             :     } incremental;
      87             :   } metrics;
      88             : 
      89             :   struct {
      90             :     fd_wksp_t *       wksp;
      91             :     ulong             chunk0;
      92             :     ulong             wmark;
      93             :     ulong             mtu;
      94             :     ulong             pos;
      95             :   } in;
      96             : 
      97             :   struct {
      98             :     fd_wksp_t *       wksp;
      99             :     ulong             chunk0;
     100             :     ulong             wmark;
     101             :     ulong             mtu;
     102             :   } adder_in[FD_SNAPSHOT_MAX_SNAPLH_TILES];
     103             : };
     104             : 
     105             : typedef struct fd_snaplv_tile fd_snaplv_t;
     106             : 
     107             : static inline int
     108           0 : should_shutdown( fd_snaplv_t * ctx ) {
     109           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     110           0 : }
     111             : 
     112             : static ulong
     113           0 : scratch_align( void ) {
     114           0 :   return alignof(fd_snaplv_t);
     115           0 : }
     116             : 
     117             : static ulong
     118           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     119           0 :   (void)tile;
     120           0 :   ulong l = FD_LAYOUT_INIT;
     121           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
     122           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaplv_t) );
     123           0 : }
     124             : 
     125             : static void
     126           0 : metrics_write( fd_snaplv_t * ctx ) {
     127           0 :   (void)ctx;
     128           0 :   FD_MGAUGE_SET( SNAPLV, FULL_DUPLICATE_ACCOUNTS_HASHED,        ctx->metrics.full.duplicate_accounts_hashed );
     129           0 :   FD_MGAUGE_SET( SNAPLV, INCREMENTAL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.incremental.duplicate_accounts_hashed );
     130           0 :   FD_MGAUGE_SET( SNAPLV, STATE,                                 (ulong)(ctx->state) );
     131           0 : }
     132             : 
     133             : static void
     134             : transition_malformed( fd_snaplv_t *  ctx,
     135           0 :                       fd_stem_context_t * stem ) {
     136           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
     137           0 :   ctx->state = FD_SNAPSHOT_STATE_ERROR;
     138           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     139           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     140           0 : }
     141             : 
     142             : static void
     143             : handle_vinyl_lthash_request( fd_snaplv_t *             ctx,
     144             :                              fd_stem_context_t *       stem,
     145             :                              ulong                     seq,
     146           0 :                              fd_vinyl_bstream_phdr_t * acc_hdr ) {
     147             : 
     148           0 :   out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
     149           0 :   uchar * data = fd_chunk_to_laddr( o_link->mem, o_link->chunk );
     150           0 :   memcpy( data, &seq, sizeof(ulong) );
     151           0 :   memcpy( data + sizeof(ulong), acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
     152           0 :   ulong data_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
     153           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, o_link->chunk, data_sz, 0UL, 0UL, 0UL );
     154           0 :   o_link->chunk = fd_dcache_compact_next( o_link->chunk, data_sz, o_link->chunk0, o_link->wmark );
     155             : 
     156           0 :   if( ctx->full ) ctx->metrics.full.duplicate_accounts_hashed++;
     157           0 :   else            ctx->metrics.incremental.duplicate_accounts_hashed++;
     158           0 : }
     159             : 
     160             : static inline void
     161           0 : handle_vinyl_lthash_seq_sync( fd_snaplv_t * ctx ) {
     162           0 :   ulong bstream_seq_min = ULONG_MAX;
     163           0 :   for( ulong i=0; i<ctx->num_write_tiles; i++ ) {
     164             :     /* There is a way to avoid a lock here: every wr_seq[i] is a ulong,
     165             :        each assigned to a unique write tile, and it works the same way
     166             :        as a stem's fseq or an mcache's seq.  Therefore, from the point
     167             :        of view snaplv, it can directly read them at any time.
     168             :        Only snapwm is allowed to overwrite the wr_seq array during the
     169             :        initialization of a full/incr snapshot, but it does so after
     170             :        synchronizing with the write tiles (making sure that they have
     171             :        already completed all pending writes) and before instructing
     172             :        snaplv to start processing the snapshot. */
     173           0 :     ulong bstream_seq = fd_vinyl_admin_ulong_query( &ctx->vinyl.admin->wr_seq[ i ] );
     174           0 :     bstream_seq_min = fd_ulong_min( bstream_seq_min, bstream_seq );
     175           0 :   }
     176           0 :   ctx->vinyl.bstream_seq_last = bstream_seq_min;
     177           0 : }
     178             : 
     179             : static inline int
     180             : handle_vinyl_lthash_seq_check_fast( fd_snaplv_t * ctx,
     181           0 :                                     ulong              seq ) {
     182           0 :   return seq < ctx->vinyl.bstream_seq_last;
     183           0 : }
     184             : 
     185             : static inline void
     186             : handle_vinyl_lthash_seq_check_until_match( fd_snaplv_t * ctx,
     187             :                                            ulong         seq,
     188           0 :                                            int           do_sleep ) {
     189           0 :   for(;;) {
     190           0 :     if( handle_vinyl_lthash_seq_check_fast( ctx, seq ) ) break;
     191           0 :     FD_SPIN_PAUSE();
     192           0 :     if( do_sleep ) fd_log_sleep( (long)1e3 ); /* 1 microsecond */
     193           0 :     handle_vinyl_lthash_seq_sync( ctx );
     194           0 :   }
     195           0 : }
     196             : 
     197             : static inline void
     198             : handle_vinyl_lthash_request_drain_all( fd_snaplv_t *       ctx,
     199           0 :                                        fd_stem_context_t * stem ) {
     200           0 :   for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     201           0 :     if( !ctx->vinyl.pending.active[ i ] ) continue;
     202           0 :     handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ i ], 1/*do_sleep*/ );
     203           0 :     handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
     204           0 :     ctx->vinyl.pending.active[ i ] = 0;
     205           0 :     ctx->vinyl.pending_cnt--;
     206           0 :   }
     207           0 :   FD_TEST( !ctx->vinyl.pending_cnt );
     208           0 : }
     209             : 
     210             : static inline void
     211             : handle_vinyl_lthash_pending_list( fd_snaplv_t *        ctx,
     212           0 :                                   fd_stem_context_t *  stem ) {
     213             :   /* Try to consume as many pending requests as possible. */
     214           0 :   for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     215           0 :     if( FD_LIKELY( !ctx->vinyl.pending.active[ i ] ) ) continue;
     216           0 :     if( handle_vinyl_lthash_seq_check_fast( ctx, ctx->vinyl.pending.seq[ i ] ) ) {
     217           0 :       handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
     218           0 :       ctx->vinyl.pending.active[ i ] = 0;
     219           0 :       ctx->vinyl.pending_cnt--;
     220           0 :     }
     221           0 :   }
     222           0 : }
     223             : 
     224             : static void
     225             : handle_data_frag( fd_snaplv_t *       ctx,
     226             :                   fd_stem_context_t * stem,
     227             :                   ulong               sig,
     228             :                   ulong               chunk,
     229             :                   ulong               sz,
     230           0 :                   ulong               tspub ) {
     231           0 :   (void)chunk; (void)sz;
     232             : 
     233           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     234             :     /* skip all data frags when in error state. */
     235           0 :     return;
     236           0 :   }
     237           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
     238           0 :     FD_LOG_ERR(( "received data frag %s (%lu) while in unexpected state %s (%u)",
     239           0 :                  fd_ssctrl_msg_ctrl_str( sig ), sig,
     240           0 :                  fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     241           0 :     return;
     242           0 :   }
     243           0 :   if( FD_UNLIKELY( sig!=FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) {
     244           0 :     FD_LOG_ERR(( "received incorrect data frag %s (%lu) in state %s (%u)",
     245           0 :                  fd_ssctrl_msg_ctrl_str( sig ), sig,
     246           0 :                  fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     247           0 :     return;
     248           0 :   }
     249             : 
     250           0 :   uchar const * in_data = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     251             : 
     252           0 :   ulong const acc_sz    = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
     253           0 :   ulong const batch_sz  = sz;
     254           0 :   ulong const batch_cnt = tspub;
     255           0 :   if( FD_UNLIKELY( batch_cnt>FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ) ) {
     256           0 :     FD_LOG_CRIT(( "batch count %lu exceeds batch count max %lu", batch_cnt, FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ));
     257           0 :   }
     258           0 :   if( FD_UNLIKELY( (batch_cnt*acc_sz)!=batch_sz ) ) {
     259           0 :     FD_LOG_CRIT(( "batch count %lu with account size %lu does not match batch size %lu", batch_cnt, acc_sz, batch_sz ));
     260           0 :   }
     261             : 
     262           0 :   for( ulong acc_i=0UL; acc_i<batch_cnt; acc_i++ ) {
     263             : 
     264             :     /* move in_data pointer forward */
     265           0 :     uchar const * acc_data = in_data;
     266           0 :     in_data += acc_sz;
     267             : 
     268           0 :     ulong acc_data_seq = fd_ulong_load_8( acc_data );
     269             : 
     270           0 :     if( FD_LIKELY( handle_vinyl_lthash_seq_check_fast( ctx, acc_data_seq ) ) ) {
     271             :       /* The request can be processed immediately, skipping the pending list. */
     272           0 :       fd_vinyl_bstream_phdr_t acc_data_phdr[1];
     273           0 :       memcpy( acc_data_phdr, acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     274           0 :       handle_vinyl_lthash_request( ctx, stem, acc_data_seq, acc_data_phdr );
     275           0 :       continue;
     276           0 :     }
     277             : 
     278             :     /* Find an empty slot in the pending list. */
     279           0 :     ulong seq_min_i = ULONG_MAX;
     280           0 :     ulong seq_min   = ULONG_MAX;
     281           0 :     ulong free_i    = ULONG_MAX;
     282           0 :     if( FD_UNLIKELY( ctx->vinyl.pending_cnt==FD_SNAPLV_DUP_PENDING_CNT_MAX ) ) {
     283             :       /* an entry must be consumed to free a slot */
     284           0 :       for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     285           0 :         ulong seq = ctx->vinyl.pending.seq[ i ];
     286           0 :         seq_min_i = fd_ulong_if( seq_min > seq, i, seq_min_i );
     287           0 :         seq_min   = fd_ulong_min( seq_min, seq );
     288           0 :       }
     289           0 :       handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ seq_min_i ], 1/*do_sleep*/ );
     290           0 :       handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ seq_min_i ], &ctx->vinyl.pending.phdr[ seq_min_i ] );
     291           0 :       ctx->vinyl.pending.active[ seq_min_i ] = 0;
     292           0 :       ctx->vinyl.pending_cnt--;
     293           0 :       free_i = seq_min_i;
     294           0 :     } else {
     295             :       /* Pick a free slot. */
     296           0 :       free_i = 0UL;
     297           0 :       for( ; free_i<FD_SNAPLV_DUP_PENDING_CNT_MAX; free_i++ ) {
     298           0 :         if( !ctx->vinyl.pending.active[ free_i ] ) break;
     299           0 :       }
     300           0 :     }
     301             : 
     302             :     /* Populate the free slot. */
     303           0 :     ctx->vinyl.pending.seq[ free_i ] = acc_data_seq;
     304           0 :     memcpy( &ctx->vinyl.pending.phdr[ free_i ], acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     305           0 :     ctx->vinyl.pending.active[ free_i ] = 1;
     306           0 :     ctx->vinyl.pending_cnt++;
     307           0 :   }
     308           0 : }
     309             : 
     310             : static void
     311             : handle_control_frag( fd_snaplv_t *       ctx,
     312             :                      fd_stem_context_t * stem,
     313             :                      ulong               sig,
     314             :                      ulong               in_idx,
     315             :                      ulong               tsorig,
     316           0 :                      ulong               tspub ) {
     317           0 :   (void)in_idx;
     318             : 
     319           0 :   if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH ) {
     320           0 :     if( FD_UNLIKELY( !ctx->fail.wait ) ) {
     321           0 :       FD_LOG_CRIT(( "received unexpected control frag %s (%lu) from snaplh in state %s (%u)",
     322           0 :                     fd_ssctrl_msg_ctrl_str( sig ), sig,
     323           0 :                     fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     324           0 :     }
     325           0 :     if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) ) {
     326           0 :       FD_LOG_CRIT(( "received incorrect control frag %s (%lu) from snaplh in state %s (%u)",
     327           0 :                     fd_ssctrl_msg_ctrl_str( sig ), sig,
     328           0 :                     fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     329           0 :     }
     330           0 :     ctx->fail.ack_cnt++;
     331           0 :     return;
     332           0 :   }
     333             : 
     334           0 :   if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
     335             :     /* Control messages move along the snapshot load pipeline.  Since
     336             :        error conditions can be triggered by any tile in the pipeline,
     337             :        it is possible to be in error state and still receive otherwise
     338             :        valid messages.  Only a fail message can revert this. */
     339           0 :     return;
     340           0 :   };
     341             : 
     342           0 :   int forward_to_ct = 1;
     343             : 
     344           0 :   switch( sig ) {
     345           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     346           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
     347           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     348           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     349           0 :       ctx->full  = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     350             : 
     351           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
     352           0 :         fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
     353           0 :         fd_lthash_zero( &ctx->recovery.full_lthash );
     354           0 :         ctx->running_capitalization = 0L;
     355           0 :         ctx->dup_capitalization     = 0L;
     356           0 :       } else {
     357             :         /* The lthash for the incremental snapshot is computed starting
     358             :            from the full snapshot lthash.  Since an init message may
     359             :            be received after a fail message, always start from the
     360             :            recovery value. */
     361           0 :         ctx->hash_accum.calculated_lthash = ctx->recovery.full_lthash;
     362           0 :         ctx->running_capitalization = ctx->recovery.capitalization;
     363           0 :         ctx->dup_capitalization     = 0L;
     364           0 :       }
     365             : 
     366           0 :       break;
     367           0 :     }
     368             : 
     369           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI: {
     370           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
     371           0 :       ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     372           0 :       ctx->hash_accum.ack_sig          = sig;
     373           0 :       ctx->hash_accum.awaiting_results = 1;
     374           0 :       handle_vinyl_lthash_request_drain_all( ctx, stem );
     375           0 :       forward_to_ct = 0;
     376           0 :       break; /* the ack is sent when all hashes are received */
     377           0 :     }
     378             : 
     379           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     380           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE: {
     381           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
     382           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     383             : 
     384             :       /* back up full_lthash for future recovery. */
     385           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_NEXT ) {
     386           0 :         ctx->recovery.full_lthash    = ctx->hash_accum.calculated_lthash;
     387           0 :         ctx->recovery.capitalization = ctx->running_capitalization;
     388           0 :       }
     389           0 :       break;
     390           0 :     }
     391             : 
     392           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR: {
     393           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     394           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     395           0 :       break;
     396           0 :     }
     397             : 
     398           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL: {
     399           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     400           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     401           0 :       ctx->fail.exp_sig = FD_SNAPSHOT_MSG_CTRL_FAIL;
     402           0 :       ctx->fail.ack_cnt = 0UL;
     403           0 :       ctx->fail.wait = 1;
     404           0 :       forward_to_ct = 0;
     405           0 :       break;
     406           0 :     }
     407             : 
     408           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
     409           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     410           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     411           0 :       break;
     412           0 :     }
     413             : 
     414           0 :     default: {
     415           0 :       FD_LOG_ERR(( "unexpected control frag %s (%lu) in state %s (%u)",
     416           0 :                    fd_ssctrl_msg_ctrl_str( sig ), sig,
     417           0 :                    fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     418           0 :       break;
     419           0 :     }
     420           0 :   }
     421             : 
     422             :   /* Forward the control message down the pipeline */
     423           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
     424           0 :   if( !forward_to_ct ) return;
     425           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
     426           0 : }
     427             : 
     428             : static void
     429             : handle_hash_frag( fd_snaplv_t *       ctx,
     430             :                   fd_stem_context_t * stem,
     431             :                   ulong               in_idx,
     432             :                   ulong               sig,
     433             :                   ulong               chunk,
     434           0 :                   ulong               sz ) {
     435           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     436             :     /* skip all hash frags when in error state. */
     437           0 :     return;
     438           0 :   }
     439           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING &&
     440           0 :                    ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
     441           0 :     if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM) ) {
     442           0 :       FD_LOG_WARNING(( "received invalid data frag %s (%lu) from snapwm in state %s (%u)",
     443           0 :                        fd_ssctrl_msg_ctrl_str( sig ), sig,
     444           0 :                        fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     445           0 :       transition_malformed( ctx, stem );
     446           0 :     }
     447           0 :     return;
     448           0 :   }
     449           0 :   switch( sig ) {
     450           0 :     case FD_SNAPSHOT_HASH_MSG_RESULT_ADD: {
     451           0 :       FD_TEST( sz==sizeof(fd_ssctrl_hash_result_t) );
     452           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH );
     453           0 :       fd_ssctrl_hash_result_t const * result = fd_chunk_to_laddr_const( ctx->adder_in[ in_idx-ctx->adder_in_offset ].wksp, chunk );
     454           0 :       fd_lthash_add( &ctx->hash_accum.calculated_lthash, &result->lthash );
     455           0 :       ctx->running_capitalization = fd_long_sat_add( ctx->running_capitalization, result->capitalization );
     456           0 :       if( FD_UNLIKELY( ctx->running_capitalization==LONG_MAX ) ) {
     457           0 :         FD_LOG_WARNING(( "capitalization overflow, running_capitalization=%ld, result_capitalization=%ld", ctx->running_capitalization, result->capitalization ));
     458           0 :         transition_malformed( ctx, stem );
     459           0 :         return;
     460           0 :       }
     461           0 :       ctx->hash_accum.received_lthashes++;
     462           0 :       break;
     463           0 :     }
     464           0 :     case FD_SNAPSHOT_HASH_MSG_RESULT_SUB: {
     465           0 :       FD_TEST( sz==sizeof(fd_ssctrl_hash_result_t) );
     466           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
     467           0 :       fd_ssctrl_hash_result_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     468           0 :       fd_lthash_sub( &ctx->hash_accum.calculated_lthash, &result->lthash );
     469           0 :       FD_TEST( result->capitalization>=0L );
     470           0 :       ctx->dup_capitalization = fd_long_sat_add( ctx->dup_capitalization, result->capitalization );
     471           0 :       break;
     472           0 :     }
     473           0 :     case FD_SNAPSHOT_HASH_MSG_EXPECTED: {
     474           0 :       FD_TEST( sz==sizeof(fd_lthash_value_t) );
     475           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
     476           0 :       fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     477           0 :       ctx->hash_accum.expected_lthash = *result;
     478           0 :       break;
     479           0 :     }
     480           0 :     default: {
     481           0 :       FD_LOG_ERR(( "unexpected hash frag %s (%lu) in state %s (%lu)",
     482           0 :                    fd_ssctrl_msg_ctrl_str( sig ), sig,
     483           0 :                    fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
     484           0 :       break;
     485           0 :     }
     486           0 :   }
     487           0 : }
     488             : 
     489             : static inline void
     490             : handle_expected_capitalization_message( fd_snaplv_t * ctx,
     491             :                                         ulong         chunk,
     492           0 :                                         ulong         sz ) {
     493           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     494             :     /* skip all hash frags when in error state. */
     495           0 :     return;
     496           0 :   }
     497             : 
     498           0 :   if( FD_UNLIKELY( sz!=sizeof(fd_ssctrl_capitalization_t) ) ) {
     499           0 :     FD_LOG_ERR(( "unexpected msg sz %lu for sig FD_SNAPSHOT_MSG_EXP_CAPITALIZATION", sz ));
     500           0 :     return;
     501           0 :   }
     502             : 
     503           0 :   fd_ssctrl_capitalization_t const * expected_cap = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     504           0 :   ctx->manifest_capitalization = expected_cap->capitalization;
     505           0 : }
     506             : 
     507             : static inline int
     508             : returnable_frag( fd_snaplv_t *       ctx,
     509             :                  ulong               in_idx,
     510             :                  ulong               seq    FD_PARAM_UNUSED,
     511             :                  ulong               sig,
     512             :                  ulong               chunk,
     513             :                  ulong               sz,
     514             :                  ulong               ctl    FD_PARAM_UNUSED,
     515             :                  ulong               tsorig,
     516             :                  ulong               tspub,
     517           0 :                  fd_stem_context_t * stem ) {
     518           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     519             : 
     520           0 :   if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_data_frag( ctx, stem, sig, chunk, sz, tspub );
     521           0 :   else if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_RESULT_ADD ||
     522           0 :                       sig==FD_SNAPSHOT_HASH_MSG_RESULT_SUB ||
     523           0 :                       sig==FD_SNAPSHOT_HASH_MSG_EXPECTED ) )      handle_hash_frag( ctx, stem, in_idx, sig, chunk, sz );
     524           0 :   else if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_EXP_CAPITALIZATION ) ) handle_expected_capitalization_message( ctx, chunk, sz );
     525           0 :   else                                                            handle_control_frag( ctx, stem, sig, in_idx, tsorig, tspub );
     526             : 
     527           0 :   return 0;
     528           0 : }
     529             : 
     530             : static void
     531             : before_credit( fd_snaplv_t *       ctx,
     532             :                fd_stem_context_t * stem FD_PARAM_UNUSED,
     533           0 :                int *               charge_busy ) {
     534           0 :   *charge_busy = 0;
     535           0 :   handle_vinyl_lthash_seq_sync( ctx );
     536           0 : }
     537             : 
     538             : static void
     539             : after_credit( fd_snaplv_t *        ctx,
     540             :               fd_stem_context_t *  stem,
     541             :               int *                opt_poll_in FD_PARAM_UNUSED,
     542           0 :               int *                charge_busy FD_PARAM_UNUSED ) {
     543             : 
     544           0 :   handle_vinyl_lthash_pending_list( ctx, stem );
     545             : 
     546           0 :   if( FD_UNLIKELY( ctx->hash_accum.awaiting_results && ctx->hash_accum.received_lthashes==ctx->num_hash_tiles ) ) {
     547             : 
     548           0 :     ctx->hash_accum.awaiting_results  = 0;
     549           0 :     ctx->hash_accum.received_lthashes = 0UL;
     550           0 :     ctx->running_capitalization       = fd_long_sat_sub( ctx->running_capitalization, ctx->dup_capitalization );
     551           0 :     if( FD_UNLIKELY( ctx->running_capitalization==LONG_MIN ) ) {
     552           0 :       FD_LOG_WARNING(( "capitalization underflow, running_capitalization=%ld, dup_capitalization=%ld", ctx->running_capitalization, ctx->dup_capitalization ));
     553           0 :       transition_malformed( ctx, stem );
     554           0 :       return;
     555           0 :     }
     556           0 :     if( FD_UNLIKELY( ctx->running_capitalization<0L ) ) {
     557           0 :       FD_LOG_WARNING(( "computed capitalization %ld is invalid", ctx->running_capitalization ));
     558           0 :       transition_malformed( ctx, stem );
     559           0 :       return;
     560           0 :     }
     561           0 :     ulong computed_capitalization = (ulong)ctx->running_capitalization;
     562           0 :     int capitalization_match      = computed_capitalization==ctx->manifest_capitalization;
     563             : 
     564           0 :     int lthash_match = !memcmp( &ctx->hash_accum.expected_lthash, &ctx->hash_accum.calculated_lthash, sizeof(fd_lthash_value_t) );
     565             : 
     566           0 :     if( FD_UNLIKELY( !lthash_match ) ) {
     567             :       /* SnapshotError::MismatchedHash
     568             :          https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L479 */
     569           0 :       FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in %s snapshot manifest",
     570           0 :                         FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
     571           0 :                         FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ),
     572           0 :                         ctx->full?"full":"incremental" ));
     573           0 :       transition_malformed( ctx, stem );
     574           0 :       return;
     575           0 :     } else {
     576           0 :       FD_LOG_INFO(( "calculated accounts lthash %s matches accounts lthash %s in %s snapshot manifest",
     577           0 :                      FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
     578           0 :                      FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ),
     579           0 :                      ctx->full?"full":"incremental" ));
     580           0 :     }
     581             : 
     582           0 :     if( FD_UNLIKELY( !capitalization_match ) ) {
     583             :       /* SnapshotError::MismatchedCapitalization
     584             :          https://github.com/anza-xyz/agave/blob/v4.0.0-beta.2/runtime/src/snapshot_bank_utils.rs#L217 */
     585           0 :       FD_LOG_WARNING(( "%s snapshot manifest capitalization %lu does not match computed capitalization %lu",
     586           0 :                        ctx->full?"full":"incremental", ctx->manifest_capitalization, computed_capitalization ));
     587           0 :       transition_malformed( ctx, stem );
     588           0 :       return;
     589           0 :     }
     590             : 
     591           0 :     fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->hash_accum.ack_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     592           0 :   }
     593             : 
     594           0 :   if( FD_UNLIKELY( ctx->fail.wait && ctx->fail.ack_cnt==ctx->num_hash_tiles ) ) {
     595           0 :     fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->fail.exp_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     596           0 :     ctx->fail.exp_sig = 0UL;
     597           0 :     ctx->fail.ack_cnt = 0UL;
     598           0 :     ctx->fail.wait    = 0;
     599           0 :     return;
     600           0 :   }
     601           0 : }
     602             : 
     603             : static ulong
     604             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     605             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     606             :                       ulong                  out_fds_cnt,
     607           0 :                       int *                  out_fds ) {
     608           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "unexpected out_fds_cnt %lu", out_fds_cnt ));
     609             : 
     610           0 :   ulong out_cnt = 0;
     611           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     612           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     613           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     614           0 :   }
     615           0 :   return out_cnt;
     616           0 : }
     617             : 
     618             : static ulong
     619             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     620             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     621             :                           ulong                  out_cnt,
     622           0 :                           struct sock_filter *   out ) {
     623           0 :   populate_sock_filter_policy_fd_snaplv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     624           0 :   return sock_filter_policy_fd_snaplv_tile_instr_cnt;
     625           0 : }
     626             : 
     627             : static void
     628             : unprivileged_init( fd_topo_t *      topo,
     629           0 :                    fd_topo_tile_t * tile ) {
     630           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     631             : 
     632           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     633           0 :   fd_snaplv_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t)         );
     634             : 
     635           0 :   FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
     636             : 
     637           0 :   ulong expected_in_cnt = 1UL + fd_topo_tile_name_cnt( topo, "snaplh" );
     638           0 :   if( FD_UNLIKELY( tile->in_cnt!=expected_in_cnt ) )  FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu",  tile->in_cnt, expected_in_cnt ));
     639           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
     640             : 
     641           0 :   ulong adder_idx = 0UL;
     642           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
     643           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     644           0 :     fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     645             : 
     646           0 :     if( FD_LIKELY( 0==strcmp( in_link->name, "snapwm_lv" ) ) ) {
     647           0 :       ctx->in.wksp                   = in_wksp->wksp;
     648           0 :       ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     649           0 :       ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     650           0 :       ctx->in.mtu                    = in_link->mtu;
     651           0 :       ctx->in.pos                    = 0UL;
     652           0 :       ctx->in_kind[ i ]              = IN_KIND_SNAPWM;
     653             : 
     654           0 :     } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplh_lv" ) ) ) {
     655           0 :       ctx->adder_in[ adder_idx ].wksp    = in_wksp->wksp;
     656           0 :       ctx->adder_in[ adder_idx ].chunk0  = fd_dcache_compact_chunk0( ctx->adder_in[ adder_idx ].wksp, in_link->dcache );
     657           0 :       ctx->adder_in[ adder_idx ].wmark   = fd_dcache_compact_wmark ( ctx->adder_in[ adder_idx ].wksp, in_link->dcache, in_link->mtu );
     658           0 :       ctx->adder_in[ adder_idx ].mtu     = in_link->mtu;
     659           0 :       ctx->in_kind[ i ]                  = IN_KIND_SNAPLH;
     660           0 :       if( FD_LIKELY( adder_idx==0UL ) ) ctx->adder_in_offset = i;
     661           0 :       adder_idx++;
     662             : 
     663           0 :     } else {
     664           0 :       FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
     665           0 :     }
     666           0 :   }
     667             : 
     668           0 :   ctx->vinyl.bstream_seq_last = 0UL;
     669             : 
     670           0 :   for( uint i=0U; i<(tile->out_cnt); i++ ) {
     671           0 :     fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
     672             : 
     673           0 :     if( 0==strcmp( link->name, "snaplv_ct" ) ) {
     674           0 :       out_link_t * o_link = &ctx->out_link[ OUT_LINK_CT ];
     675           0 :       o_link->idx    = i;
     676           0 :       o_link->mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     677           0 :       o_link->chunk0 = 0UL;
     678           0 :       o_link->wmark  = 0UL;
     679           0 :       o_link->chunk  = 0UL;
     680             : 
     681           0 :     } else if( 0==strcmp( link->name, "snaplv_lh" ) ) {
     682           0 :       out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
     683           0 :       o_link->idx    = i;
     684           0 :       o_link->mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     685           0 :       o_link->chunk0 = fd_dcache_compact_chunk0( o_link->mem, link->dcache );
     686           0 :       o_link->wmark  = fd_dcache_compact_wmark( o_link->mem, link->dcache, link->mtu );
     687           0 :       o_link->chunk  = o_link->chunk0;
     688             : 
     689           0 :     } else {
     690           0 :       FD_LOG_ERR(( "unexpected output link %s", link->name ));
     691           0 :     }
     692           0 :   }
     693             : 
     694           0 :   memset( ctx->vinyl.pending.active, 0, FD_SNAPLV_DUP_PENDING_CNT_MAX*sizeof(int) );
     695           0 :   ctx->vinyl.pending_cnt = 0;
     696             : 
     697           0 :   ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
     698           0 :   FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
     699           0 :   fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
     700           0 :   FD_TEST( vinyl_admin );
     701           0 :   ctx->vinyl.admin = vinyl_admin;
     702           0 :   for(;;) {
     703             :     /* This query can be done without the need of an rwlock. */
     704           0 :     ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
     705           0 :     if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
     706           0 :                    vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
     707           0 :     fd_log_sleep( (long)1e6 /*1ms*/ );
     708           0 :     FD_SPIN_PAUSE();
     709           0 :   }
     710             : 
     711           0 :   ctx->metrics.full.duplicate_accounts_hashed        = 0UL;
     712           0 :   ctx->metrics.incremental.duplicate_accounts_hashed = 0UL;
     713             : 
     714           0 :   ctx->state                        = FD_SNAPSHOT_STATE_IDLE;
     715           0 :   ctx->full                         = 1;
     716             : 
     717           0 :   ctx->num_hash_tiles               = fd_topo_tile_name_cnt( topo, "snaplh" );
     718           0 :   ctx->num_write_tiles              = fd_topo_tile_name_cnt( topo, "snapwr" );
     719           0 :   FD_TEST( ctx->num_write_tiles<=FD_VINYL_ADMIN_WR_SEQ_CNT_MAX );
     720             : 
     721           0 :   ctx->hash_accum.received_lthashes = 0UL;
     722           0 :   ctx->hash_accum.awaiting_results  = 0;
     723           0 :   ctx->hash_accum.hash_check_done   = 0;
     724             : 
     725           0 :   fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
     726           0 :   fd_lthash_zero( &ctx->recovery.full_lthash );
     727             : 
     728           0 :   ctx->recovery.capitalization = 0L;
     729           0 :   ctx->running_capitalization  = 0L;
     730           0 :   ctx->dup_capitalization      = 0L;
     731           0 :   ctx->manifest_capitalization = 0UL;
     732             : 
     733           0 :   ctx->fail.exp_sig = 0UL;
     734           0 :   ctx->fail.ack_cnt = 0UL;
     735           0 :   ctx->fail.wait    = 0;
     736           0 : }
     737             : 
     738           0 : #define STEM_BURST (FD_SNAPLV_STEM_BURST)
     739           0 : #define STEM_LAZY  1000L
     740             : 
     741           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaplv_t
     742           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplv_t)
     743             : 
     744             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     745           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     746           0 : #define STEM_CALLBACK_BEFORE_CREDIT   before_credit
     747           0 : #define STEM_CALLBACK_AFTER_CREDIT    after_credit
     748           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     749             : 
     750             : 
     751             : #include "../../disco/stem/fd_stem.c"
     752             : 
     753             : fd_topo_run_tile_t fd_tile_snaplv = {
     754             :   .name                     = NAME,
     755             :   .populate_allowed_fds     = populate_allowed_fds,
     756             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     757             :   .scratch_align            = scratch_align,
     758             :   .scratch_footprint        = scratch_footprint,
     759             :   .unprivileged_init        = unprivileged_init,
     760             :   .run                      = stem_run,
     761             : };
     762             : 
     763             : #undef NAME

Generated by: LCOV version 1.14