LCOV - code coverage report
Current view: top level - discof/restore - fd_snaplh_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 595 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 31 0.0 %

          Line data    Source code
       1             : #include "../../disco/topo/fd_topo.h"
       2             : #include "../../disco/metrics/fd_metrics.h"
       3             : #include "../../ballet/lthash/fd_lthash_adder.h"
       4             : #include "../../util/pod/fd_pod.h"
       5             : #include "../../vinyl/io/fd_vinyl_io.h"
       6             : #include "../../vinyl/io/ur/fd_vinyl_io_ur_private.h"
       7             : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
       8             : #include "../../util/io_uring/fd_io_uring_setup.h"
       9             : #include "../../util/io_uring/fd_io_uring_register.h"
      10             : #include "../../util/io_uring/fd_io_uring.h"
      11             : #include "generated/fd_snaplh_tile_seccomp.h"
      12             : 
      13             : #include "utils/fd_ssctrl.h"
      14             : #include "utils/fd_vinyl_admin.h"
      15             : 
      16             : #include <errno.h>
      17             : #include <sys/stat.h> /* fstat */
      18             : #include <fcntl.h>    /* open  */
      19             : #include <unistd.h>   /* close */
      20             : 
      21             : #include "../../vinyl/io/ur/fd_vinyl_io_ur.h"
      22             : 
      23             : #define NAME "snaplh"
      24             : 
      25             : #define IN_CNT_MAX     (2UL)
      26           0 : #define IN_KIND_SNAPLV (0UL)
      27           0 : #define IN_KIND_SNAPWH (1UL)
      28             : 
      29             : #define VINYL_LTHASH_BLOCK_ALIGN  FD_VINYL_BSTREAM_BLOCK_SZ
      30           0 : #define VINYL_LTHASH_BLOCK_MAX_SZ (16UL<<20)
      31             : FD_STATIC_ASSERT( VINYL_LTHASH_BLOCK_MAX_SZ>(sizeof(fd_snapshot_full_account_t)+FD_VINYL_BSTREAM_BLOCK_SZ+2*VINYL_LTHASH_BLOCK_ALIGN), "VINYL_LTHASH_BLOCK_MAX_SZ" );
      32             : 
      33           0 : #define VINYL_LTHASH_RD_REQ_MAX   (32UL)
      34           0 : #define VINYL_LTHASH_IORING_DEPTH (2*VINYL_LTHASH_RD_REQ_MAX)
      35             : 
      36           0 : #define VINYL_LTHASH_IO_SPAD_MAX  (2<<20UL)
      37             : 
      38           0 : #define VINYL_LTHASH_RD_REQ_FREE  (0UL)
      39           0 : #define VINYL_LTHASH_RD_REQ_PEND  (1UL)
      40           0 : #define VINYL_LTHASH_RD_REQ_SENT  (2UL)
      41             : 
      42             : struct in_link_private {
      43             :   fd_wksp_t *  wksp;
      44             :   ulong        chunk0;
      45             :   ulong        wmark;
      46             :   ulong        mtu;
      47             :   void const * base;
      48             :   ulong *      seq_sync;  /* fseq->seq[0] */
      49             : };
      50             : typedef struct in_link_private in_link_t;
      51             : 
      52             : struct out_link_private {
      53             :   fd_wksp_t *  wksp;
      54             :   ulong        chunk0;
      55             :   ulong        wmark;
      56             :   ulong        chunk;
      57             :   ulong        mtu;
      58             : };
      59             : typedef struct out_link_private out_link_t;
      60             : 
      61             : struct fd_snaplh_tile {
      62             :   uint state;
      63             :   int  full;
      64             : 
      65             :   ulong seed;
      66             :   ulong lthash_tile_cnt;
      67             :   ulong lthash_tile_idx;
      68             :   ulong lthash_tile_add_cnt;
      69             :   ulong lthash_tile_sub_cnt;
      70             :   ulong lthash_tile_add_idx;
      71             :   ulong lthash_tile_sub_idx;
      72             :   ulong pairs_seen;
      73             :   ulong lthash_req_seen;
      74             : 
      75             :   /* Database params */
      76             :   ulong const * io_seed;
      77             : 
      78             :   fd_lthash_adder_t   adder[1];
      79             :   fd_lthash_adder_t   adder_sub[1];
      80             :   uchar               data[FD_RUNTIME_ACC_SZ_MAX];
      81             : 
      82             :   fd_lthash_value_t   running_lthash;
      83             :   fd_lthash_value_t   running_lthash_sub;
      84             :   ulong               running_capitalization_add;
      85             :   ulong               running_capitalization_sub;
      86             : 
      87             :   struct {
      88             :     int               dev_fd;
      89             :     ulong             dev_sz;
      90             :     ulong             dev_base;
      91             :     void *            pair_mem;
      92             :     void *            pair_tmp;
      93             : 
      94             :     struct {
      95             :       fd_vinyl_bstream_phdr_t phdr  [VINYL_LTHASH_RD_REQ_MAX];
      96             :       fd_vinyl_io_rd_t        rd_req[VINYL_LTHASH_RD_REQ_MAX];
      97             :     } pending;
      98             :     ulong             pending_rd_req_cnt;
      99             : 
     100             :     fd_vinyl_io_t *   io;
     101             :     fd_vinyl_admin_t * admin;
     102             :   } vinyl;
     103             : 
     104             :   struct {
     105             :     struct {
     106             :       ulong accounts_hashed;
     107             :     } full;
     108             : 
     109             :     struct {
     110             :       ulong accounts_hashed;
     111             :     } incremental;
     112             :   } metrics;
     113             : 
     114             :   ulong       wh_finish_fseq;
     115             :   ulong       wh_last_in_seq;
     116             : 
     117             :   in_link_t   in[IN_CNT_MAX];
     118             :   uchar       in_kind[IN_CNT_MAX];
     119             :   out_link_t  out;
     120             : 
     121             :   int         lthash_completion_pending;
     122             :   int         fail_completion_pending;
     123             : 
     124             :   /* io_uring setup */
     125             : 
     126             :   fd_io_uring_t ioring[1];
     127             :   int           io_uring_enabled;
     128             : };
     129             : 
     130             : typedef struct fd_snaplh_tile fd_snaplh_t;
     131             : 
     132             : static inline int
     133           0 : should_shutdown( fd_snaplh_t * ctx ) {
     134           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     135           0 : }
     136             : 
     137             : static ulong
     138           0 : scratch_align( void ) {
     139           0 :   return alignof(fd_snaplh_t);
     140           0 : }
     141             : 
     142             : static ulong
     143           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     144           0 :   (void)tile;
     145           0 :   ulong l = FD_LAYOUT_INIT;
     146           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaplh_t),      sizeof(fd_snaplh_t)                                );
     147           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          );
     148           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          );
     149           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          );
     150           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ  );
     151           0 :   l = FD_LAYOUT_APPEND( l, fd_vinyl_io_ur_align(),    fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
     152           0 :   l = FD_LAYOUT_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
     153           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaplh_t) );
     154           0 : }
     155             : 
     156             : static void
     157           0 : metrics_write( fd_snaplh_t * ctx ) {
     158           0 :   FD_MGAUGE_SET( SNAPLH, FULL_ACCOUNTS_HASHED,        ctx->metrics.full.accounts_hashed );
     159           0 :   FD_MGAUGE_SET( SNAPLH, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
     160           0 :   FD_MGAUGE_SET( SNAPLH, STATE,                       (ulong)(ctx->state) );
     161           0 : }
     162             : 
     163             : static inline int
     164           0 : should_hash_account( fd_snaplh_t * ctx ) {
     165           0 :   return (ctx->pairs_seen % ctx->lthash_tile_add_cnt)==ctx->lthash_tile_add_idx;
     166           0 : }
     167             : 
     168             : static inline int
     169           0 : should_process_lthash_request( fd_snaplh_t * ctx ) {
     170           0 :   return (ctx->lthash_req_seen % ctx->lthash_tile_sub_cnt)==ctx->lthash_tile_sub_idx;
     171           0 : }
     172             : 
     173             : static void
     174             : streamlined_hash( fd_snaplh_t *       restrict ctx,
     175             :                   fd_lthash_adder_t * restrict adder,
     176             :                   fd_lthash_value_t * restrict running_lthash,
     177             :                   uchar const *       restrict _pair,
     178           0 :                   int                 is_add ) {
     179           0 :   uchar const * pair = _pair;
     180           0 :   fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
     181           0 :   pair += sizeof(fd_vinyl_bstream_phdr_t);
     182           0 :   fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
     183           0 :   pair += sizeof(fd_account_meta_t);
     184           0 :   uchar const * data = pair;
     185             : 
     186           0 :   ulong data_len      = meta->dlen;
     187           0 :   const char * pubkey = phdr->key.c;
     188           0 :   ulong lamports      = meta->lamports;
     189           0 :   const uchar * owner = meta->owner;
     190           0 :   uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
     191             : 
     192           0 :   if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
     193           0 :   if( FD_UNLIKELY( lamports==0UL ) ) return;
     194             : 
     195           0 :   fd_lthash_adder_push_solana_account( adder,
     196           0 :                                        running_lthash,
     197           0 :                                        pubkey,
     198           0 :                                        data,
     199           0 :                                        data_len,
     200           0 :                                        lamports,
     201           0 :                                        executable,
     202           0 :                                        owner );
     203             : 
     204           0 :   if( is_add ) ctx->running_capitalization_add += lamports;
     205           0 :   else         ctx->running_capitalization_sub += lamports;
     206             : 
     207           0 :   if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
     208           0 :   else                         ctx->metrics.incremental.accounts_hashed++;
     209           0 : }
     210             : 
     211             : static void
     212             : handle_vinyl_lthash_request_bd( fd_snaplh_t *             ctx,
     213             :                                 ulong                     seq,
     214           0 :                                 fd_vinyl_bstream_phdr_t * acc_hdr ) {
     215             : 
     216             :   /* The bd version is blocking, therefore ctx->pending is not used. */
     217           0 :   ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
     218             : 
     219           0 :   ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
     220           0 :   ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     221             : 
     222             :   /* dev_seq shows where the seq is physically located in device. */
     223           0 :   ulong dev_seq  = ( seq + ctx->vinyl.dev_base ) % ctx->vinyl.dev_sz;
     224           0 :   ulong rd_off   = fd_ulong_align_dn( dev_seq, FD_VINYL_BSTREAM_BLOCK_SZ );
     225           0 :   ulong pair_off = (dev_seq - rd_off);
     226           0 :   ulong rd_sz    = fd_ulong_align_up( pair_off + pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     227           0 :   FD_TEST( rd_sz < VINYL_LTHASH_BLOCK_MAX_SZ );
     228             : 
     229           0 :   uchar * pair = ((uchar*)ctx->vinyl.pair_mem) + pair_off;
     230           0 :   fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)pair;
     231             : 
     232           0 :   for(;;) {
     233           0 :     ulong sz    = rd_sz;
     234           0 :     ulong rsz   = fd_ulong_min( rd_sz, ctx->vinyl.dev_sz - rd_off );
     235           0 :     uchar * dst = ctx->vinyl.pair_mem;
     236           0 :     uchar * tmp = ctx->vinyl.pair_tmp;
     237             : 
     238           0 :     bd_read( ctx->vinyl.dev_fd, rd_off, dst, rsz );
     239           0 :     sz -= rsz;
     240           0 :     if( FD_UNLIKELY( sz ) ) {
     241             :       /* When the dev wraps around, the dev_base needs to be skipped.
     242             :          This means: increase the size multiple of the alignment,
     243             :          read into a temporary buffer, and memcpy into the dst at the
     244             :          correct offset. */
     245           0 :       bd_read( ctx->vinyl.dev_fd, 0, tmp, sz + FD_VINYL_BSTREAM_BLOCK_SZ );
     246           0 :       fd_memcpy( dst + rsz, tmp + ctx->vinyl.dev_base, sz );
     247           0 :     }
     248             : 
     249           0 :     if( FD_LIKELY( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) ) ) {
     250             : 
     251             :       /* test bstream pair integrity hashes */
     252           0 :       int test = !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz );
     253           0 :       if( FD_LIKELY( test ) ) break;
     254           0 :     }
     255           0 :     FD_LOG_WARNING(( "phdr mismatch! this should not happen under bstream_seq, continue ..." ));
     256           0 :     FD_SPIN_PAUSE();
     257           0 :   }
     258             : 
     259           0 :   streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair, 0 );
     260           0 : }
     261             : 
     262             : FD_FN_UNUSED static inline ulong
     263           0 : rd_req_ctx_get_idx( ulong rd_req_ctx ) {
     264           0 :   return ( rd_req_ctx >>  0 ) & ((1UL<<32)-1UL);
     265           0 : }
     266             : 
     267             : FD_FN_UNUSED static inline ulong
     268           0 : rd_req_ctx_get_status( ulong rd_req_ctx ) {
     269           0 :   return ( rd_req_ctx >> 32 ) & ((1UL<<32)-1UL);
     270           0 : }
     271             : 
     272             : FD_FN_UNUSED static inline void
     273             : rd_req_ctx_into_parts( ulong   rd_req_ctx,
     274             :                        ulong * idx,
     275           0 :                        ulong * status ) {
     276           0 :   *idx    = rd_req_ctx_get_idx( rd_req_ctx );
     277           0 :   *status = rd_req_ctx_get_status( rd_req_ctx );
     278           0 : }
     279             : 
     280             : FD_FN_UNUSED static inline ulong
     281             : rd_req_ctx_from_parts( ulong idx,
     282           0 :                        ulong status ) {
     283           0 :   return ( idx & ((1UL<<32)-1UL) ) | ( status << 32 );
     284           0 : }
     285             : 
     286             : FD_FN_UNUSED static inline ulong
     287             : rd_req_ctx_update_status( ulong rd_req_ctx,
     288           0 :                           ulong status ) {
     289           0 :   return rd_req_ctx_from_parts( rd_req_ctx_get_idx( rd_req_ctx ), status );
     290           0 : }
     291             : 
     292             : static void
     293             : handle_vinyl_lthash_compute_from_rd_req( fd_snaplh_t *      ctx,
     294           0 :                                          fd_vinyl_io_rd_t * rd_req ) {
     295           0 :   ulong idx = rd_req_ctx_get_idx( rd_req->ctx );
     296             : 
     297           0 :   fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rd_req->dst;
     298           0 :   fd_vinyl_bstream_phdr_t * acc_hdr = &ctx->vinyl.pending.phdr[ idx ];
     299             : 
     300             :   /* test the retrieved header (it must mach the request) */
     301           0 :   FD_TEST( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) );
     302             : 
     303           0 :   ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
     304           0 :   ulong   seq     = rd_req->seq;
     305           0 :   uchar * pair    = (uchar*)rd_req->dst;
     306           0 :   ulong   pair_sz = rd_req->sz;
     307             : 
     308             :   /* test the bstream pair integrity hashes */
     309           0 :   FD_TEST( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz ) );
     310             : 
     311           0 :   streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair, 0 );
     312           0 : }
     313             : 
     314             : /* Process next read completion */
     315             : 
     316             : static inline ulong
     317           0 : consume_available_cqe( fd_snaplh_t * ctx ) {
     318           0 :   if( FD_LIKELY( !ctx->vinyl.pending_rd_req_cnt ) ) return 0UL;
     319           0 :   if( FD_UNLIKELY( !ctx->io_uring_enabled ) ) return 0UL;
     320           0 :   if( !fd_io_uring_cq_ready( ctx->ioring->cq ) ) return 0UL;
     321             : 
     322             :   /* At this point, there is at least one unconsumed CQE */
     323             : 
     324           0 :   fd_vinyl_io_rd_t * rd_req = NULL;
     325           0 :   if( FD_LIKELY( fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, 0/*non blocking*/ )==FD_VINYL_SUCCESS ) ) {
     326           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     327           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     328           0 :     rd_req->seq = ULONG_MAX;
     329           0 :     rd_req->sz  = 0UL;
     330           0 :     ctx->vinyl.pending_rd_req_cnt--;
     331           0 :     return 1UL;
     332           0 :   }
     333           0 :   return 0UL;
     334           0 : }
     335             : 
     336             : static void
     337             : handle_vinyl_lthash_request_ur( fd_snaplh_t *             ctx,
     338             :                                 ulong                     seq,
     339           0 :                                 fd_vinyl_bstream_phdr_t * acc_hdr ) {
     340             :   /* Find a free slot */
     341           0 :   ulong free_i = ULONG_MAX;
     342           0 :   if( FD_LIKELY( ctx->vinyl.pending_rd_req_cnt<VINYL_LTHASH_RD_REQ_MAX ) ) {
     343           0 :     for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     344           0 :       fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     345           0 :       if( FD_UNLIKELY( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE ) ) {
     346           0 :         free_i = i;
     347           0 :         break;
     348           0 :       }
     349           0 :     }
     350           0 :   } else {
     351           0 :     fd_vinyl_io_rd_t * rd_req = NULL;
     352           0 :     fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
     353           0 :     FD_TEST( rd_req!=NULL );
     354           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     355           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     356           0 :     rd_req->seq = ULONG_MAX;
     357           0 :     rd_req->sz  = 0UL;
     358           0 :     free_i      = rd_req_ctx_get_idx( rd_req->ctx );
     359           0 :     ctx->vinyl.pending_rd_req_cnt--;
     360           0 :   }
     361           0 :   FD_CRIT( free_i<VINYL_LTHASH_RD_REQ_MAX, "read request free index exceeds max value" );
     362             : 
     363             :   /* Populate the empty slot and submit */
     364           0 :   fd_vinyl_bstream_phdr_t * in_phdr = &ctx->vinyl.pending.phdr[ free_i ];
     365           0 :   *in_phdr = *acc_hdr;
     366           0 :   ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
     367           0 :   ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     368             : 
     369             :   /* Fixup io addressable range */
     370           0 :   fd_vinyl_io_t * io = ctx->vinyl.io;
     371           0 :   io->seq_past    = fd_ulong_align_dn( seq,         FD_VINYL_BSTREAM_BLOCK_SZ );
     372           0 :   io->seq_present = fd_ulong_align_up( seq+pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     373           0 :   if( io->type==FD_VINYL_IO_TYPE_UR ) {
     374           0 :     fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
     375           0 :     ur->seq_clean = ur->seq_cache = ur->seq_write = io->seq_present;
     376           0 :   }
     377             : 
     378           0 :   fd_vinyl_io_rd_t * rd_req  = &ctx->vinyl.pending.rd_req[ free_i ];
     379           0 :   rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_PEND );
     380           0 :   rd_req->seq = seq;
     381           0 :   rd_req->sz  = pair_sz;
     382           0 :   fd_vinyl_io_read( ctx->vinyl.io, rd_req );
     383           0 :   rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_SENT );
     384           0 :   ctx->vinyl.pending_rd_req_cnt++;
     385           0 : }
     386             : 
     387             : static void
     388           0 : handle_vinyl_lthash_request_ur_consume_all( fd_snaplh_t * ctx ) {
     389           0 :   while( ctx->vinyl.pending_rd_req_cnt ) {
     390           0 :     fd_vinyl_io_rd_t * rd_req = NULL;
     391           0 :     fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
     392           0 :     FD_TEST( rd_req!=NULL );
     393           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     394           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     395           0 :     rd_req->seq = ULONG_MAX;
     396           0 :     rd_req->sz  = 0UL;
     397           0 :     ctx->vinyl.pending_rd_req_cnt--;
     398           0 :   }
     399           0 :   FD_CRIT( !ctx->vinyl.pending_rd_req_cnt, "pending read requests count not zero" );
     400           0 :   for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     401           0 :     fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     402           0 :     FD_CRIT( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE, "pending request status is not free" );
     403           0 :   }
     404           0 : }
     405             : 
     406             : static void
     407             : handle_lthash_completion( fd_snaplh_t * ctx,
     408           0 :                           fd_stem_context_t * stem ) {
     409           0 :   if( FD_LIKELY( !ctx->lthash_completion_pending ) ) return;
     410             : 
     411           0 :   if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
     412           0 :     fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
     413           0 :     fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
     414           0 :     fd_lthash_sub( &ctx->running_lthash, &ctx->running_lthash_sub );
     415           0 :     fd_ssctrl_hash_result_t * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
     416           0 :     fd_memcpy( out->lthash.bytes, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
     417           0 :     long capitalization_add = fd_long_if( ctx->running_capitalization_add>LONG_MAX, LONG_MAX, (long)ctx->running_capitalization_add );
     418           0 :     long capitalization_sub = fd_long_if( ctx->running_capitalization_sub>LONG_MAX, LONG_MAX, (long)ctx->running_capitalization_sub );
     419           0 :     if( FD_UNLIKELY( capitalization_add==LONG_MAX ) ) {
     420           0 :       FD_LOG_ERR(( "capitalization overflow detected: add=%lu", ctx->running_capitalization_add ));
     421           0 :     }
     422           0 :     if( FD_UNLIKELY( capitalization_sub==LONG_MAX ) ) {
     423           0 :       FD_LOG_ERR(( "capitalization overflow detected: sub=%lu", ctx->running_capitalization_sub ));
     424           0 :     }
     425           0 :     out->capitalization = capitalization_add - capitalization_sub;
     426           0 :     fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT_ADD, ctx->out.chunk, sizeof(fd_ssctrl_hash_result_t), 0UL, 0UL, 0UL );
     427           0 :     ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, sizeof(fd_ssctrl_hash_result_t), ctx->out.chunk0, ctx->out.wmark );
     428           0 :     ctx->lthash_completion_pending = 0;
     429           0 :   }
     430           0 : }
     431             : 
     432             : static void
     433             : handle_fail_completion( fd_snaplh_t * ctx,
     434           0 :                   fd_stem_context_t * stem ) {
     435           0 :   if( FD_LIKELY( !ctx->fail_completion_pending ) ) return;
     436             : 
     437           0 :   if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
     438           0 :     fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
     439           0 :     fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
     440           0 :     fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     441           0 :     ctx->fail_completion_pending = 0;
     442           0 :   }
     443           0 : }
     444             : 
     445             : static void
     446             : before_credit( fd_snaplh_t *       ctx,
     447             :                fd_stem_context_t * stem FD_PARAM_UNUSED,
     448           0 :                int *               charge_busy ) {
     449           0 :   *charge_busy = !!consume_available_cqe( ctx );
     450           0 : }
     451             : 
     452             : static void
     453             : handle_wh_data_frag( fd_snaplh_t * ctx,
     454             :                      ulong         in_idx,
     455             :                      ulong         chunk,      /* compressed input pointer */
     456             :                      ulong         sz_comp,    /* compressed input size */
     457           0 :                      fd_stem_context_t * stem ) {
     458           0 :   FD_CRIT( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH, "incorrect in kind" );
     459             : 
     460           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     461             :     /* skip all wh data frags when in error state. */
     462           0 :     return;
     463           0 :   }
     464           0 :   if( FD_UNLIKELY( ctx->fail_completion_pending ) ) {
     465             :     /* handle_fail_completion may succeed (complete) either when the
     466             :        control frag that triggers it is received (conditional upon
     467             :        having no pending wh data frags) or after all wh data frags have
     468             :        been received and processed.  Once the fail control message
     469             :        is received, the state transitions into idle. */
     470           0 :     handle_fail_completion( ctx, stem );
     471           0 :     return;
     472           0 :   }
     473             : 
     474           0 :   uchar const * rem    = fd_chunk_to_laddr_const( ctx->in[ in_idx ].base, chunk );
     475           0 :   ulong         rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
     476           0 :   FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
     477           0 :   FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ),     "misaligned write request" );
     478             : 
     479           0 :   while( rem_sz ) {
     480           0 :     FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
     481             : 
     482           0 :     fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t *)rem;
     483           0 :     ulong ctl      = phdr->ctl;
     484           0 :     int   ctl_type = fd_vinyl_bstream_ctl_type( ctl );
     485           0 :     switch( ctl_type ) {
     486             : 
     487           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
     488           0 :         ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
     489           0 :         ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     490           0 :         if( FD_LIKELY( should_hash_account( ctx ) ) ) {
     491           0 :           uchar * pair = ctx->vinyl.pair_mem;
     492           0 :           fd_memcpy( pair, rem, pair_sz );
     493           0 :           streamlined_hash( ctx, ctx->adder, &ctx->running_lthash, pair, 1 );
     494           0 :         }
     495           0 :         rem    += pair_sz;
     496           0 :         rem_sz -= pair_sz;
     497           0 :         ctx->pairs_seen++;
     498           0 :         break;
     499           0 :       }
     500             : 
     501           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
     502           0 :         rem    += FD_VINYL_BSTREAM_BLOCK_SZ;
     503           0 :         rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
     504           0 :         break;
     505           0 :       }
     506             : 
     507           0 :       default:
     508           0 :         FD_LOG_CRIT(( "unexpected vinyl bstream block ctl %016lx for %s snapshot",
     509           0 :                       ctl, ctx->full?"full":"incremental" ));
     510           0 :     }
     511           0 :   }
     512             : 
     513           0 :   if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
     514             :     /* handle_lthash_completion may succeed (complete) either when the
     515             :        control frag that triggers it is received (conditional upon
     516             :        having no pending wh data frags) or after all wh data frags have
     517             :        been received and processed. */
     518           0 :     handle_lthash_completion( ctx, stem );
     519           0 :   }
     520           0 : }
     521             : 
     522             : static void
     523             : handle_lv_data_frag( fd_snaplh_t * ctx,
     524             :                      ulong         in_idx,
     525             :                      ulong         sig,
     526           0 :                      ulong         chunk ) { /* compressed input pointer */
     527             : 
     528           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     529             :     /* skip all lv data frags when in error state. */
     530           0 :     return;
     531           0 :   }
     532           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING  ) ) {
     533           0 :     FD_LOG_ERR(( "received snaplv data frag %s (%lu) in state %s (%lu)",
     534           0 :                  fd_ssctrl_msg_ctrl_str( sig ), sig,
     535           0 :                  fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
     536           0 :     return;
     537           0 :   }
     538             : 
     539           0 :   if( FD_LIKELY( should_process_lthash_request( ctx ) ) ) {
     540           0 :     uchar const * indata = fd_chunk_to_laddr_const( ctx->in[ in_idx ].wksp, chunk );
     541           0 :     ulong seq;
     542           0 :     fd_vinyl_bstream_phdr_t acc_hdr[1];
     543           0 :     memcpy( &seq,    indata, sizeof(ulong) );
     544           0 :     memcpy( acc_hdr, indata + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     545           0 :     if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     546           0 :       handle_vinyl_lthash_request_ur( ctx, seq, acc_hdr );
     547           0 :     } else {
     548           0 :       handle_vinyl_lthash_request_bd( ctx, seq, acc_hdr );
     549           0 :     }
     550           0 :   }
     551           0 :   ctx->lthash_req_seen++;
     552           0 : }
     553             : 
     554             : static inline ulong
     555             : tsorig_tspub_to_fseq( ulong tsorig,
     556           0 :                       ulong tspub ) {
     557           0 :   return (tspub<<32 ) | tsorig;
     558           0 : }
     559             : 
     560             : static void
     561             : handle_control_frag( fd_snaplh_t * ctx,
     562             :                      ulong         sig,
     563             :                      ulong         tsorig,
     564             :                      ulong         tspub,
     565           0 :                     fd_stem_context_t * stem  ) {
     566           0 :   if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
     567             :     /* Control messages move along the snapshot load pipeline.  Since
     568             :        error conditions can be triggered by any tile in the pipeline,
     569             :        it is possible to be in error state and still receive otherwise
     570             :        valid messages.  Only a fail message can revert this. */
     571           0 :     return;
     572           0 :   };
     573             : 
     574           0 :   switch( sig ) {
     575           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     576           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
     577           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     578           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     579           0 :       ctx->full  = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     580           0 :       fd_lthash_zero( &ctx->running_lthash );
     581           0 :       fd_lthash_zero( &ctx->running_lthash_sub );
     582           0 :       fd_lthash_adder_new( ctx->adder );
     583           0 :       fd_lthash_adder_new( ctx->adder_sub );
     584           0 :       ctx->running_capitalization_add = 0UL;
     585           0 :       ctx->running_capitalization_sub = 0UL;
     586           0 :       break;
     587           0 :     }
     588             : 
     589           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI: {
     590           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
     591           0 :       ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     592           0 :       ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
     593           0 :       if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     594           0 :         handle_vinyl_lthash_request_ur_consume_all( ctx );
     595           0 :       }
     596           0 :       ctx->lthash_completion_pending = 1;
     597             :       /* handle_lthash_completion may succeed (complete) either here
     598             :          (if there are no pending wh data frags) or after all wh data
     599             :          frags have been received and processed. */
     600           0 :       handle_lthash_completion( ctx, stem );
     601           0 :       break;
     602           0 :     }
     603             : 
     604           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     605           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE: {
     606           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
     607           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     608           0 :       break;
     609           0 :     }
     610             : 
     611           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR: {
     612           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     613           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     614           0 :       break;
     615           0 :     }
     616             : 
     617           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL: {
     618           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     619           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     620           0 :       ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
     621           0 :       if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     622           0 :         handle_vinyl_lthash_request_ur_consume_all( ctx );
     623           0 :       }
     624           0 :       ctx->fail_completion_pending = 1;
     625             :       /* handle_fail_completion may succeed (complete) either here (if
     626             :          there are no pending wh data frags) or after all wh data frags
     627             :          have been received and processed. */
     628           0 :       handle_fail_completion( ctx, stem );
     629           0 :       break;
     630           0 :     }
     631             : 
     632           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     633           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     634           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     635           0 :       break;
     636             : 
     637           0 :     default: {
     638           0 :       FD_LOG_ERR(( "received unexpected control frag %s (%lu) in state %s (%lu)",
     639           0 :                    fd_ssctrl_msg_ctrl_str( sig ), sig,
     640           0 :                    fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
     641           0 :       break;
     642           0 :     }
     643           0 :   }
     644           0 : }
     645             : 
     646             : static inline int
     647             : returnable_frag( fd_snaplh_t *       ctx,
     648             :                  ulong               in_idx,
     649             :                  ulong               seq,
     650             :                  ulong               sig,
     651             :                  ulong               chunk,
     652             :                  ulong               sz,
     653             :                  ulong               ctl,
     654             :                  ulong               tsorig,
     655             :                  ulong               tspub,
     656           0 :                  fd_stem_context_t * stem ) {
     657           0 :   (void)sz; (void)ctl;
     658           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     659             : 
     660           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) )          handle_wh_data_frag( ctx, in_idx, chunk, tsorig, stem );
     661           0 :   else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_lv_data_frag( ctx, in_idx, sig, chunk );
     662           0 :   else                                                               handle_control_frag( ctx, sig, tsorig, tspub, stem );
     663             : 
     664             :   /* Because fd_stem may not return flow control credits fast enough,
     665             :      always update fseq (consumer progress) here. */
     666           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) {
     667           0 :     ctx->wh_last_in_seq = seq;
     668           0 :     fd_fseq_update( ctx->in[ in_idx ].seq_sync, fd_seq_inc( seq, 1UL ) );
     669           0 :   }
     670             : 
     671           0 :   return 0;
     672           0 : }
     673             : 
     674             : static ulong
     675             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     676             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     677             :                       ulong                  out_fds_cnt,
     678           0 :                       int *                  out_fds ) {
     679           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "incorrect out_fds_cnt %lu", out_fds_cnt ));
     680             : 
     681           0 :   ulong out_cnt = 0;
     682           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     683           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     684           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     685           0 :   }
     686             : 
     687           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     688           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     689           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
     690             : 
     691           0 :   out_fds[ out_cnt++ ] = ctx->vinyl.dev_fd;
     692             : 
     693           0 :   if( FD_LIKELY( ctx->ioring->ioring_fd>=0 ) ) {
     694           0 :     out_fds[ out_cnt++ ] = ctx->ioring->ioring_fd;
     695           0 :   }
     696             : 
     697           0 :   return out_cnt;
     698           0 : }
     699             : 
     700             : static void
     701           0 : during_housekeeping( fd_snaplh_t * ctx ) {
     702             : 
     703             :   /* Service io_uring instance */
     704             : 
     705           0 :   if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     706           0 :     uint sq_drops = fd_io_uring_sq_dropped( ctx->ioring->sq );
     707           0 :     if( FD_UNLIKELY( sq_drops ) ) {
     708           0 :       FD_LOG_CRIT(( "kernel io_uring dropped I/O requests, cannot continue (sq_dropped=%u)", sq_drops ));
     709           0 :     }
     710             : 
     711           0 :     uint cq_drops = fd_io_uring_cq_overflow( ctx->ioring->cq );
     712           0 :     if( FD_UNLIKELY( cq_drops ) ) {
     713           0 :       FD_LOG_CRIT(( "kernel io_uring dropped I/O completions, cannot continue (cq_overflow=%u)", cq_drops ));
     714           0 :     }
     715           0 :   }
     716             : 
     717           0 : }
     718             : 
     719             : static ulong
     720             : populate_allowed_seccomp( fd_topo_t const *      topo,
     721             :                           fd_topo_tile_t const * tile,
     722             :                           ulong                  out_cnt,
     723           0 :                           struct sock_filter *   out ) {
     724           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     725           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     726           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
     727             : 
     728           0 :   populate_sock_filter_policy_fd_snaplh_tile( out_cnt, out,
     729           0 :       (uint)fd_log_private_logfile_fd(),
     730           0 :       (uint)ctx->vinyl.dev_fd,
     731           0 :       (uint)ctx->ioring->ioring_fd /* possibly -1 */ );
     732           0 :   return sock_filter_policy_fd_snaplh_tile_instr_cnt;
     733           0 : }
     734             : 
     735             : static fd_vinyl_io_t *
     736             : snaplh_io_uring_init( fd_snaplh_t * ctx,
     737             :                       void *        uring_shmem,
     738             :                       void *        vinyl_io_ur_mem,
     739           0 :                       int           dev_fd ) {
     740           0 :   ulong const uring_depth = VINYL_LTHASH_IORING_DEPTH;
     741           0 :   fd_io_uring_params_t params[1];
     742           0 :   fd_io_uring_params_init( params, (uint)uring_depth );
     743             : 
     744           0 :   if( FD_UNLIKELY( !fd_io_uring_init_shmem( ctx->ioring, params, uring_shmem, uring_depth, uring_depth ) ) ) {
     745           0 :     FD_LOG_ERR(( "fd_io_uring_init_shmem failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     746           0 :   }
     747           0 :   fd_io_uring_t * ioring = ctx->ioring;
     748             : 
     749           0 :   if( FD_UNLIKELY( fd_io_uring_register_files( ioring->ioring_fd, &dev_fd, 1 )<0 ) ) {
     750           0 :     FD_LOG_ERR(( "io_uring_register_files failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     751           0 :   }
     752             : 
     753           0 :   fd_io_uring_restriction_t res[3] = {
     754           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_OP,
     755           0 :       .sqe_op    = IORING_OP_READ },
     756           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_FLAGS_REQUIRED,
     757           0 :       .sqe_flags = IOSQE_FIXED_FILE },
     758           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_FLAGS_ALLOWED,
     759           0 :       .sqe_flags = 0 }
     760           0 :   };
     761           0 :   if( FD_UNLIKELY( fd_io_uring_register_restrictions( ioring->ioring_fd, res, 3U )<0 ) ) {
     762           0 :     FD_LOG_ERR(( "io_uring_register_restrictions failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     763           0 :   }
     764             : 
     765           0 :   if( FD_UNLIKELY( fd_io_uring_enable_rings( ioring->ioring_fd )<0 ) ) {
     766           0 :     FD_LOG_ERR(( "io_uring_enable_rings failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     767           0 :   }
     768             : 
     769           0 :   ulong align = fd_vinyl_io_ur_align();
     770           0 :   FD_TEST( fd_ulong_is_pow2( align ) );
     771             : 
     772           0 :   ulong footprint = fd_vinyl_io_ur_footprint( VINYL_LTHASH_IO_SPAD_MAX );
     773           0 :   FD_TEST( fd_ulong_is_aligned( footprint, align ) );
     774             : 
     775             :   /* Before invoking fd_vinyl_io_ur_init, the sync block must be
     776             :      already available.  Although in principle one could keep
     777             :      calling fd_vinyl_io_ur_init until it returns !=NULL, doing this
     778             :      would log uncessary (and misleading) warnings. */
     779           0 :   FD_LOG_INFO(( "waiting for account database creation" ));
     780           0 :   for(;;) {
     781           0 :     fd_vinyl_bstream_block_t block[1];
     782           0 :     ulong dev_sync = 0UL; /* Use the beginning of the file for the sync block */
     783           0 :     bd_read( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
     784           0 :     int type = fd_vinyl_bstream_ctl_type( block->sync.ctl );
     785           0 :     if( FD_UNLIKELY( type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC ) ) continue;
     786           0 :     ulong io_seed = block->sync.hash_trail;
     787           0 :     if( FD_LIKELY( !fd_vinyl_bstream_block_test( io_seed, block ) ) ) break;
     788           0 :     fd_log_sleep( 1e6 ); /* 1ms */
     789           0 :   }
     790           0 :   FD_LOG_INFO(( "found valid account database sync block, attaching ..." ));
     791             : 
     792           0 :   fd_vinyl_io_t * io = fd_vinyl_io_ur_init( vinyl_io_ur_mem, VINYL_LTHASH_IO_SPAD_MAX, dev_fd, ioring );
     793           0 :   if( FD_UNLIKELY( !io ) ) FD_LOG_ERR(( "vinyl_io_ur_init failed" ));
     794           0 :   return io;
     795           0 : }
     796             : 
     797             : static void
     798             : privileged_init( fd_topo_t *      topo,
     799           0 :                  fd_topo_tile_t * tile ) {
     800           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     801             : 
     802           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     803           0 :   fd_snaplh_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t),      sizeof(fd_snaplh_t)                                );
     804           0 :   void * pair_mem    = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          ); (void)pair_mem;
     805           0 :   void * pair_tmp    = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          ); (void)pair_tmp;
     806           0 :   void * rd_req_mem  = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ  ); (void)rd_req_mem;
     807           0 :   void * uring_mem   = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_ur_align(),    fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
     808           0 :   void * uring_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
     809             : 
     810           0 :   FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
     811             : 
     812             :   /* Set up io_bd dependencies */
     813             : 
     814           0 :   char const * bstream_path = tile->snaplh.vinyl_path;
     815             :   /* Note: it would be possible to use O_DIRECT, but it would require
     816             :      VINYL_LTHASH_BLOCK_ALIGN to be 4096UL, which substantially
     817             :      increases the read overhead, making it slower (keep in mind that
     818             :      a rather large subset of mainnet accounts typically fits inside
     819             :      one FD_VINYL_BSTREAM_BLOCK_SZ. */
     820           0 :   int dev_fd = open( bstream_path, O_RDONLY|O_CLOEXEC, 0444 );
     821           0 :   if( FD_UNLIKELY( dev_fd<0 ) ) {
     822           0 :     FD_LOG_ERR(( "open(%s,O_RDONLY|O_CLOEXEC, 0444) failed (%i-%s)",
     823           0 :                  bstream_path, errno, fd_io_strerror( errno ) ));
     824           0 :   }
     825             : 
     826           0 :   struct stat st;
     827           0 :   if( FD_UNLIKELY( 0!=fstat( dev_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", bstream_path, errno, strerror( errno ) ));
     828             : 
     829           0 :   ctx->vinyl.dev_fd  = dev_fd;
     830           0 :   ulong bstream_sz   = (ulong)st.st_size;
     831           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
     832           0 :     FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
     833           0 :   }
     834           0 :   ctx->vinyl.dev_sz   = bstream_sz;
     835           0 :   ctx->vinyl.dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
     836             : 
     837           0 :   ctx->vinyl.io = NULL;
     838           0 :   ctx->ioring->ioring_fd = -1;
     839             : 
     840           0 :   if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
     841           0 :     ctx->vinyl.io = snaplh_io_uring_init( ctx, uring_shmem, uring_mem, dev_fd );
     842           0 :   }
     843           0 :   ctx->io_uring_enabled = tile->snaplh.io_uring_enabled;
     844           0 : }
     845             : 
     846             : static void
     847             : unprivileged_init( fd_topo_t *      topo,
     848           0 :                    fd_topo_tile_t * tile ) {
     849           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     850             : 
     851           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     852           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t),     sizeof(fd_snaplh_t)                               );
     853           0 :   void *   pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                         );
     854           0 :   void *   pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                         );
     855           0 :   void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
     856             : 
     857           0 :   FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
     858             : 
     859           0 :   ctx->vinyl.pair_mem = pair_mem;
     860           0 :   ctx->vinyl.pair_tmp = pair_tmp;
     861             : 
     862           0 :   if( FD_UNLIKELY( tile->in_cnt!=IN_CNT_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, IN_CNT_MAX ));
     863           0 :   if( FD_UNLIKELY( tile->out_cnt!=1UL       ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1",  tile->out_cnt            ));
     864             : 
     865           0 :   ctx->io_seed = NULL;
     866             : 
     867           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
     868           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     869           0 :     fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     870           0 :     if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_lh" ) ) ) {
     871           0 :       ctx->in[ i ].wksp     = in_wksp->wksp;
     872           0 :       ctx->in[ i ].chunk0   = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache );
     873           0 :       ctx->in[ i ].wmark    = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu );
     874           0 :       ctx->in[ i ].mtu      = in_link->mtu;
     875           0 :       ctx->in[ i ].base     = NULL;
     876           0 :       ctx->in[ i ].seq_sync = NULL;
     877           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPLV;
     878           0 :     } else if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
     879           0 :       ctx->in[ i ].wksp     = in_wksp->wksp;
     880           0 :       ctx->in[ i ].chunk0   = 0;
     881           0 :       ctx->in[ i ].wmark    = 0;
     882           0 :       ctx->in[ i ].mtu      = 0;
     883           0 :       ctx->in[ i ].base     = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snaplh.dcache_obj_id ) );
     884           0 :       ctx->in[ i ].seq_sync = tile->in_link_fseq[ i ];
     885           0 :       ctx->wh_last_in_seq   = fd_fseq_query( tile->in_link_fseq[ i ] );
     886           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPWH;
     887           0 :       ctx->io_seed          = (ulong const *)fd_dcache_app_laddr_const( ctx->in[ i ].base );
     888           0 :       FD_TEST( ctx->in[ i ].base );
     889           0 :     } else {
     890           0 :       FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
     891           0 :     }
     892           0 :   }
     893             : 
     894           0 :   FD_TEST( ctx->io_seed );
     895             : 
     896           0 :   fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ 0UL ] ];
     897           0 :   ctx->out.wksp    = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
     898           0 :   ctx->out.chunk0  = fd_dcache_compact_chunk0( fd_wksp_containing( out_link->dcache ), out_link->dcache );
     899           0 :   ctx->out.wmark   = fd_dcache_compact_wmark ( ctx->out.wksp, out_link->dcache, out_link->mtu );
     900           0 :   ctx->out.chunk   = ctx->out.chunk0;
     901           0 :   ctx->out.mtu     = out_link->mtu;
     902           0 :   FD_TEST( 0==strcmp( out_link->name, "snaplh_lv" ) );
     903             : 
     904           0 :   fd_lthash_adder_new( ctx->adder );
     905           0 :   fd_lthash_adder_new( ctx->adder_sub );
     906             : 
     907           0 :   ctx->metrics.full.accounts_hashed        = 0UL;
     908           0 :   ctx->metrics.incremental.accounts_hashed = 0UL;
     909             : 
     910           0 :   memset( ctx->vinyl.pending.phdr,   0, sizeof(fd_vinyl_bstream_phdr_t) * VINYL_LTHASH_RD_REQ_MAX );
     911           0 :   memset( ctx->vinyl.pending.rd_req, 0, sizeof(fd_vinyl_io_rd_t)        * VINYL_LTHASH_RD_REQ_MAX );
     912           0 :   for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     913           0 :     fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     914           0 :     rd_req->ctx = rd_req_ctx_from_parts( i, VINYL_LTHASH_RD_REQ_FREE );
     915           0 :     rd_req->dst = NULL;
     916           0 :     if( rd_req_mem!=NULL ) {
     917           0 :       rd_req->dst = ((uchar*)rd_req_mem) + i*VINYL_LTHASH_BLOCK_MAX_SZ;
     918           0 :     }
     919           0 :   }
     920           0 :   ctx->vinyl.pending_rd_req_cnt = 0UL;
     921             : 
     922           0 :   ctx->state                   = FD_SNAPSHOT_STATE_IDLE;
     923           0 :   ctx->full                    = 1;
     924           0 :   ctx->lthash_tile_cnt         = fd_topo_tile_name_cnt( topo, "snaplh" );
     925           0 :   ctx->lthash_tile_idx         = tile->kind_id;
     926             :   /* This may seem redundant, but it provides flexibility around which
     927             :      tiles and do addition and subtraction of lthash. */
     928           0 :   ctx->lthash_tile_add_cnt     = ctx->lthash_tile_cnt;
     929           0 :   ctx->lthash_tile_sub_cnt     = ctx->lthash_tile_cnt;
     930           0 :   ctx->lthash_tile_add_idx     = ctx->lthash_tile_idx;
     931           0 :   ctx->lthash_tile_sub_idx     = ctx->lthash_tile_idx;
     932           0 :   if( ctx->lthash_tile_add_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_add_idx < ctx->lthash_tile_add_cnt );
     933           0 :   if( ctx->lthash_tile_sub_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_sub_idx < ctx->lthash_tile_sub_cnt );
     934           0 :   ctx->pairs_seen              = 0UL;
     935           0 :   ctx->lthash_req_seen         = 0UL;
     936           0 :   fd_lthash_zero( &ctx->running_lthash );
     937           0 :   fd_lthash_zero( &ctx->running_lthash_sub );
     938           0 :   ctx->running_capitalization_add = 0UL;
     939           0 :   ctx->running_capitalization_sub = 0UL;
     940             : 
     941           0 :   ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
     942           0 :   FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
     943           0 :   fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
     944           0 :   FD_TEST( vinyl_admin );
     945           0 :   ctx->vinyl.admin = vinyl_admin;
     946           0 :   for(;;) {
     947             :     /* This query can be done without the need of an rwlock. */
     948           0 :     ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
     949           0 :     if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
     950           0 :                    vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
     951           0 :     fd_log_sleep( (long)1e6 /*1ms*/ );
     952           0 :     FD_SPIN_PAUSE();
     953           0 :   }
     954             : 
     955           0 :   ctx->lthash_completion_pending = 0;
     956           0 :   ctx->fail_completion_pending   = 0;
     957           0 : }
     958             : 
     959           0 : #define STEM_BURST 1UL
     960           0 : #define STEM_LAZY  1000L
     961             : 
     962           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaplh_t
     963           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplh_t)
     964             : 
     965             : #define STEM_CALLBACK_SHOULD_SHUTDOWN     should_shutdown
     966           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
     967           0 : #define STEM_CALLBACK_RETURNABLE_FRAG     returnable_frag
     968           0 : #define STEM_CALLBACK_BEFORE_CREDIT       before_credit
     969           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     970             : 
     971             : #include "../../disco/stem/fd_stem.c"
     972             : 
     973             : fd_topo_run_tile_t fd_tile_snaplh = {
     974             :   .name                     = NAME,
     975             :   .populate_allowed_fds     = populate_allowed_fds,
     976             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     977             :   .scratch_align            = scratch_align,
     978             :   .scratch_footprint        = scratch_footprint,
     979             :   .privileged_init          = privileged_init,
     980             :   .unprivileged_init        = unprivileged_init,
     981             :   .run                      = stem_run,
     982             : };
     983             : 
     984             : #undef NAME

Generated by: LCOV version 1.14