LCOV - code coverage report
Current view: top level - discof/accdb - fd_accdb_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 523 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 13 0.0 %

          Line data    Source code
       1             : /* Vinyl database server (Firedancer adaptation)
       2             : 
       3             :    This implementation is a fork of src/vinyl/fd_vinyl_exec with some
       4             :    Firedancer-specific changes:
       5             :    - All clients are joined on startup
       6             :    - Some errors (invalid link_id, invalid comp_gaddr) result in hard
       7             :      crashes instead of silent drops
       8             :    - Sandboxing */
       9             : 
      10             : #define _GNU_SOURCE
      11             : #include "../../disco/topo/fd_topo.h"
      12             : #include "../../disco/metrics/fd_metrics.h"
      13             : #include "../../discof/restore/fd_snapct_tile.h"
      14             : #include "../../vinyl/fd_vinyl.h"
      15             : #include "../../vinyl/fd_vinyl_base.h"
      16             : #include "../../vinyl/io/ur/fd_vinyl_io_ur.h"
      17             : #include "../../util/pod/fd_pod_format.h"
      18             : #include "../../util/io_uring/fd_io_uring_setup.h"
      19             : #include "../../util/io_uring/fd_io_uring_register.h"
      20             : #include "generated/fd_accdb_tile_seccomp.h"
      21             : 
      22             : #include <errno.h>
      23             : #include <fcntl.h>
      24             : #include <lz4.h>
      25             : #include <sys/stat.h>
      26             : #include <linux/io_uring.h>
      27             : 
      28             : #define NAME "accdb"
      29             : #define MAX_INS 8
      30             : 
      31             : /* For io_ur backend, this controls the size of the write-back cache.
      32             :    This should be larger than the cumulative record size of all unique
      33             :    changed accounts in a slot. */
      34           0 : #define IO_SPAD_MAX (128UL<<20)
      35             : 
      36             : #define FD_VINYL_CLIENT_MAX (1024UL)
      37           0 : #define FD_VINYL_REQ_MAX    (1024UL)
      38             : 
      39             : struct fd_vinyl_client {
      40             :   fd_vinyl_rq_t * rq;        /* Channel for requests from this client (could be shared by multiple vinyl instances) */
      41             :   fd_vinyl_cq_t * cq;        /* Channel for completions from this client to this vinyl instance
      42             :                                 (could be shared by multiple receivers of completions from this vinyl instance). */
      43             :   ulong           burst_max; /* Max requests receive from this client at a time */
      44             :   ulong           seq;       /* Sequence number of the next request to receive in the rq */
      45             :   ulong           link_id;   /* Identifies requests from this client to this vinyl instance in the rq */
      46             :   ulong           laddr0;    /* A valid non-zero gaddr from this client maps to the vinyl instance's laddr laddr0 + gaddr ... */
      47             :   ulong           laddr1;    /* ... and thus is in (laddr0,laddr1).  A zero gaddr maps to laddr NULL. */
      48             :   ulong           quota_rem; /* Num of remaining acquisitions this client is allowed on this vinyl instance */
      49             :   ulong           quota_max; /* Max quota */
      50             : };
      51             : 
      52             : typedef struct fd_vinyl_client fd_vinyl_client_t;
      53             : 
      54             : /* MAP_REQ_GADDR maps a request global address req_gaddr to an array of
      55             :    cnt T's into the local address space as a T * pointer.  If the result
      56             :    is not properly aligned or the entire range does not completely fall
      57             :    within the shared region with the client, returns NULL.  Likewise,
      58             :    gaadr 0 maps to NULL.  Assumes sizeof(T)*(n) does not overflow (which
      59             :    is true where as n is at most batch_cnt which is at most 2^32 and
      60             :    sizeof(T) is at most 40. */
      61             : 
      62           0 : #define MAP_REQ_GADDR( gaddr, T, n ) ((T *)fd_vinyl_laddr( (gaddr), alignof(T), sizeof(T)*(n), client_laddr0, client_laddr1 ))
      63             : 
      64             : FD_FN_CONST static inline void *
      65             : fd_vinyl_laddr( ulong req_gaddr,
      66             :                 ulong align,
      67             :                 ulong footprint,
      68             :                 ulong client_laddr0,
      69           0 :                 ulong client_laddr1 ) {
      70           0 :   ulong req_laddr0 = client_laddr0 + req_gaddr;
      71           0 :   ulong req_laddr1 = req_laddr0    + footprint;
      72           0 :   return (void *)fd_ulong_if( (!!req_gaddr) & fd_ulong_is_aligned( req_laddr0, align ) &
      73           0 :                               (client_laddr0<=req_laddr0) & (req_laddr0<=req_laddr1) & (req_laddr1<=client_laddr1),
      74           0 :                               req_laddr0, 0UL );
      75           0 : }
      76             : 
      77             : struct fd_vinyl_tile {
      78             : 
      79             :   /* Vinyl objects */
      80             : 
      81             :   fd_vinyl_t vinyl[1];
      82             :   void * io_mem;
      83             : 
      84             :   /* Tile architecture */
      85             : 
      86             :   uint booted : 1;
      87             :   uint shutdown : 1;
      88             :   struct {
      89             :     ulong                  state_expected;
      90             :     ulong volatile const * state;
      91             :     ulong volatile const * pair_cnt;
      92             :     /* When booting from genesis only */
      93             :     struct {
      94             :       ulong                io_seed;
      95             :     } from_genesis;
      96             :   } boot;
      97             : 
      98             :   /* I/O */
      99             : 
     100             :   int   bstream_fd;
     101             :   ulong bstream_file_sz;
     102             : 
     103             :   /* io_uring */
     104             : 
     105             :   fd_io_uring_t ring[1];
     106             :   void * ioring_shmem; /* shared between kernel and user */
     107             : 
     108             :   /* Clients */
     109             : 
     110             :   fd_vinyl_client_t _client[ FD_VINYL_CLIENT_MAX ];
     111             :   ulong             client_cnt;
     112             :   ulong             client_idx;
     113             : 
     114             :   /* Received requests */
     115             : 
     116             :   fd_vinyl_req_t _req[ FD_VINYL_REQ_MAX ];
     117             :   ulong          req_head;                 /* Requests [0,req_head)         have been processed */
     118             :   ulong          req_tail;                 /* Requests [req_head,req_tail)  are pending */
     119             :                                            /* Requests [req_tail,ULONG_MAX) have not been received */
     120             :   ulong exec_max;
     121             : 
     122             :   /* accum_dead_cnt is the number of dead blocks that have been
     123             :      written since the last partition block.
     124             : 
     125             :      accum_move_cnt is the number of move blocks that have been
     126             :      written since this last partition block.
     127             : 
     128             :      accum_garbage_cnt / sz is the number of items / bytes garbage in
     129             :      the bstream that have accumulated since the last time we compacted
     130             :      the bstream.  We use this to estimate the number of rounds of
     131             :      compaction to do in async handling. */
     132             : 
     133             :   ulong accum_dead_cnt;
     134             :   ulong accum_garbage_cnt;
     135             :   ulong accum_garbage_sz;
     136             : 
     137             :   /* Run loop state */
     138             : 
     139             :   ulong seq_part;
     140             : 
     141             :   /* Periodic syncing */
     142             : 
     143             :   long sync_next_ns;
     144             : 
     145             :   /* Vinyl limit on the number of pairs the meta map will accept.
     146             :      Exceeding this limit will trigger a LOG_ERR. */
     147             :   ulong pair_cnt_limit;
     148             : };
     149             : 
     150             : typedef struct fd_vinyl_tile fd_vinyl_tile_t;
     151             : 
     152             : /* Vinyl state object */
     153             : 
     154             : static ulong
     155           0 : scratch_align( void ) {
     156           0 :   return FD_SHMEM_HUGE_PAGE_SZ;
     157           0 : }
     158             : 
     159             : struct fd_accdb_tile_layout {
     160             :   ulong footprint;
     161             :   ulong io_off;
     162             :   ulong io_uring_shmem_off;
     163             :   ulong vinyl_line_off;
     164             : };
     165             : 
     166             : typedef struct fd_accdb_tile_layout fd_accdb_tile_layout_t;
     167             : 
     168             : static void
     169             : fd_accdb_tile_layout( fd_accdb_tile_layout_t * layout,
     170           0 :                       fd_topo_tile_t const *   tile ) {
     171           0 :   memset( layout, 0, sizeof(fd_accdb_tile_layout_t) );
     172             : 
     173           0 :   FD_SCRATCH_ALLOC_INIT( l, NULL );
     174           0 :   ulong ctx_off = (ulong)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_vinyl_tile_t), sizeof(fd_vinyl_tile_t) );
     175           0 :   FD_TEST( ctx_off==0UL );
     176             : 
     177           0 :   switch( tile->accdb.io_type ) {
     178           0 :   case FD_VINYL_IO_TYPE_UR:
     179           0 :     layout->io_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
     180           0 :         l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint( IO_SPAD_MAX ) );
     181           0 :     layout->io_uring_shmem_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
     182           0 :         l, FD_SHMEM_HUGE_PAGE_SZ, fd_io_uring_shmem_footprint( tile->accdb.uring_depth, tile->accdb.uring_depth ) );
     183           0 :     break;
     184           0 :   case FD_VINYL_IO_TYPE_BD:
     185           0 :     layout->io_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
     186           0 :         l, fd_vinyl_io_bd_align(), fd_vinyl_io_bd_footprint( IO_SPAD_MAX ) );
     187           0 :     break;
     188           0 :   default:
     189           0 :     FD_LOG_CRIT(( "invalid tile->accdb.io_type %d", tile->accdb.io_type ));
     190           0 :   }
     191             : 
     192           0 :   layout->vinyl_line_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
     193           0 :       l, alignof(fd_vinyl_line_t), sizeof(fd_vinyl_line_t)*tile->accdb.line_max );
     194           0 :   layout->footprint = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     195           0 : }
     196             : 
     197             : static ulong
     198           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     199           0 :   fd_accdb_tile_layout_t layout[1];
     200           0 :   fd_accdb_tile_layout( layout, tile );
     201           0 :   return layout->footprint;
     202           0 : }
     203             : 
     204             : static ulong
     205             : populate_allowed_fds( fd_topo_t      const * topo,
     206             :                       fd_topo_tile_t const * tile,
     207             :                       ulong                  out_fds_cnt,
     208           0 :                       int *                  out_fds ) {
     209           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     210             : 
     211           0 :   ulong out_cnt = 0;
     212           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     213           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     214           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     215           0 :   }
     216             : 
     217           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     218           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     219           0 :   fd_vinyl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_vinyl_tile_t), sizeof(fd_vinyl_tile_t) );
     220             : 
     221           0 :   out_fds[ out_cnt++ ] = ctx->bstream_fd;
     222             : 
     223           0 :   if( ctx->ring->ioring_fd>=0 ) out_fds[ out_cnt++ ] = ctx->ring->ioring_fd;
     224             : 
     225           0 :   return out_cnt;
     226           0 : }
     227             : 
     228             : static ulong
     229             : populate_allowed_seccomp( fd_topo_t const *      topo,
     230             :                           fd_topo_tile_t const * tile,
     231             :                           ulong                  out_cnt,
     232           0 :                           struct sock_filter *   out ) {
     233           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     234           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     235           0 :   fd_vinyl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_vinyl_tile_t), sizeof(fd_vinyl_tile_t) );
     236             : 
     237           0 :   populate_sock_filter_policy_fd_accdb_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->bstream_fd, (uint)ctx->ring->ioring_fd );
     238           0 :   return sock_filter_policy_fd_accdb_tile_instr_cnt;
     239           0 : }
     240             : 
     241             : static void
     242             : vinyl_io_uring_init( fd_vinyl_tile_t * ctx,
     243             :                      uint              uring_depth,
     244           0 :                      int               dev_fd ) {
     245           0 :   fd_io_uring_params_t params[1];
     246           0 :   fd_io_uring_params_init( params, uring_depth );
     247             : 
     248             :   /* We busy poll the kernel syscall interface and use GETEVENTS.
     249             :      Therefore inhibit interrupt-driven completions. */
     250           0 :   params->flags    |= IORING_SETUP_COOP_TASKRUN;
     251           0 :   params->features |= IORING_SETUP_DEFER_TASKRUN;
     252             : 
     253           0 :   if( FD_UNLIKELY( !fd_io_uring_init_shmem( ctx->ring, params, ctx->ioring_shmem, uring_depth, uring_depth ) ) ) {
     254           0 :     FD_LOG_ERR(( "fd_io_uring_init_shmem failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     255           0 :   }
     256             : 
     257           0 :   if( FD_UNLIKELY( fd_io_uring_register_files( ctx->ring->ioring_fd, &dev_fd, 1 )<0 ) ) {
     258           0 :     FD_LOG_ERR(( "io_uring_register_files failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     259           0 :   }
     260             : 
     261           0 :   fd_io_uring_restriction_t res[4] = {
     262           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_OP,
     263           0 :       .sqe_op    = IORING_OP_READ },
     264           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_OP,
     265           0 :       .sqe_op    = IORING_OP_WRITE },
     266           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_FLAGS_REQUIRED,
     267           0 :       .sqe_flags = IOSQE_FIXED_FILE },
     268           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_FLAGS_ALLOWED,
     269           0 :       .sqe_flags = 0 }
     270           0 :   };
     271           0 :   if( FD_UNLIKELY( fd_io_uring_register_restrictions( ctx->ring->ioring_fd, res, 4U )<0 ) ) {
     272           0 :     FD_LOG_ERR(( "io_uring_register_restrictions failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     273           0 :   }
     274             : 
     275             :   /* Enable rings */
     276           0 :   if( FD_UNLIKELY( fd_io_uring_enable_rings( ctx->ring->ioring_fd )<0 ) ) {
     277           0 :     FD_LOG_ERR(( "io_uring_enable_rings failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     278           0 :   }
     279           0 : }
     280             : 
     281             : static void
     282             : privileged_init( fd_topo_t *      topo,
     283           0 :                  fd_topo_tile_t * tile ) {
     284           0 :   ulong line_footprint;
     285           0 :   if( FD_UNLIKELY( !tile->accdb.line_max || __builtin_umull_overflow( tile->accdb.line_max, sizeof(fd_vinyl_line_t), &line_footprint ) ) ) {
     286           0 :     FD_LOG_ERR(( "invalid vinyl_line_max %lu", tile->accdb.line_max ));
     287           0 :   }
     288             : 
     289           0 :   fd_vinyl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     290           0 :   ulong ctx_laddr = (ulong)ctx;
     291             : 
     292           0 :   memset( ctx, 0, sizeof(fd_vinyl_tile_t) );
     293           0 :   ctx->bstream_fd      = -1;
     294           0 :   ctx->ring->ioring_fd = -1;
     295             : 
     296           0 :   fd_accdb_tile_layout_t layout[1];
     297           0 :   fd_accdb_tile_layout( layout, tile );
     298             : 
     299           0 :   fd_vinyl_t * vinyl = ctx->vinyl;
     300           0 :   ctx->io_mem = (void *)( ctx_laddr + layout->io_off );
     301             : 
     302           0 :   if( tile->accdb.io_type==FD_VINYL_IO_TYPE_UR ) {
     303           0 :     ctx->ioring_shmem = (void *)( ctx_laddr + layout->io_uring_shmem_off );
     304           0 :   }
     305             : 
     306           0 :   fd_vinyl_line_t * _line = (void *)( ctx_laddr + layout->vinyl_line_off );
     307             : 
     308           0 :   vinyl->cnc            = NULL;
     309           0 :   vinyl->io             = NULL;
     310           0 :   vinyl->line           = (fd_vinyl_line_t *)_line;
     311           0 :   vinyl->line_footprint = line_footprint;
     312             : 
     313             :   /* FIXME use O_DIRECT? */
     314           0 :   FD_LOG_INFO(( "opening vinyl database file %s", tile->accdb.bstream_path ));
     315           0 :   int dev_fd = open( tile->accdb.bstream_path, O_RDWR|O_CLOEXEC );
     316           0 :   if( FD_UNLIKELY( dev_fd<0 ) ) FD_LOG_ERR(( "open(%s,O_RDWR|O_CLOEXEC) failed (%i-%s)", tile->accdb.bstream_path, errno, fd_io_strerror( errno ) ));
     317             : 
     318           0 :   ctx->bstream_fd = dev_fd;
     319             : 
     320           0 :   struct stat st;
     321           0 :   if( FD_UNLIKELY( fstat( dev_fd, &st )<0 ) ) {
     322           0 :     FD_LOG_ERR(( "fstat on bstream fd failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     323           0 :   }
     324           0 :   ctx->bstream_file_sz = (ulong)st.st_size;
     325             : 
     326           0 :   int io_type = tile->accdb.io_type;
     327           0 :   if( io_type==FD_VINYL_IO_TYPE_UR ) {
     328           0 :     vinyl_io_uring_init( ctx, tile->accdb.uring_depth, dev_fd );
     329           0 :   } else if( io_type!=FD_VINYL_IO_TYPE_BD ) {
     330           0 :     FD_LOG_ERR(( "Unsupported vinyl io_type %d", io_type ));
     331           0 :   }
     332             : 
     333             :   /* Only needed when booting from genesis */
     334           0 :   FD_TEST( fd_rng_secure( &ctx->boot.from_genesis.io_seed, 8UL ) );
     335           0 : }
     336             : 
     337             : static void
     338             : unprivileged_init( fd_topo_t *      topo,
     339           0 :                    fd_topo_tile_t * tile ) {
     340             : 
     341           0 :   fd_vinyl_tile_t * ctx   = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     342           0 :   fd_vinyl_t *      vinyl = ctx->vinyl;
     343             : 
     344           0 :   ctx->sync_next_ns = fd_log_wallclock();
     345             : 
     346           0 :   void * _meta = fd_topo_obj_laddr( topo, tile->accdb.meta_map_obj_id  );
     347           0 :   void * _ele  = fd_topo_obj_laddr( topo, tile->accdb.meta_pool_obj_id );
     348           0 :   void * _obj  = fd_topo_obj_laddr( topo, tile->accdb.data_obj_id      );
     349             : 
     350           0 : # define TEST( c ) do { if( FD_UNLIKELY( !(c) ) ) { FD_LOG_ERR(( "FAIL: %s", #c )); } } while(0)
     351             : 
     352           0 :   vinyl->cnc_footprint  = 0UL;
     353           0 :   vinyl->meta_footprint = topo->objs[ tile->accdb.meta_map_obj_id  ].footprint;
     354           0 :   vinyl->ele_footprint  = topo->objs[ tile->accdb.meta_pool_obj_id ].footprint;
     355           0 :   vinyl->obj_footprint  = topo->objs[ tile->accdb.data_obj_id      ].footprint;
     356             : 
     357           0 :   void * obj_laddr0 = fd_wksp_containing( _obj );
     358           0 :   ulong part_thresh =    64UL<<20;
     359           0 :   ulong gc_thresh   =   128UL<<20;
     360           0 :   int   gc_eager    =           2;
     361             : 
     362           0 :   ulong ele_max  = fd_ulong_pow2_dn( vinyl->ele_footprint / sizeof( fd_vinyl_meta_ele_t ) );
     363             : 
     364           0 :   ulong pair_max = ele_max - 1UL;
     365           0 :   ulong line_cnt = fd_ulong_min( vinyl->line_footprint / sizeof( fd_vinyl_line_t ), pair_max );
     366             : 
     367           0 :   TEST( (3UL<=line_cnt) & (line_cnt<=FD_VINYL_LINE_MAX) );
     368             : 
     369             :   /* seed is arb */
     370             : 
     371             :   /* part_thresh is arb */
     372             : 
     373             :   /* gc_thresh is arb */
     374             : 
     375           0 :   TEST( (-1<=gc_eager) & (gc_eager<=63) );
     376             : 
     377           0 :   vinyl->line_cnt = line_cnt;
     378           0 :   vinyl->pair_max = pair_max;
     379             : 
     380           0 :   vinyl->part_thresh  = part_thresh;
     381           0 :   vinyl->gc_thresh    = gc_thresh;
     382           0 :   vinyl->gc_eager     = gc_eager;
     383           0 :   vinyl->style        = FD_VINYL_BSTREAM_CTL_STYLE_RAW;
     384           0 :   vinyl->line_idx_lru = 0U;
     385           0 :   vinyl->pair_cnt     = 0UL;
     386           0 :   vinyl->garbage_sz   = 0UL;
     387             : 
     388           0 :   TEST( fd_vinyl_meta_join( vinyl->meta, _meta, _ele )==vinyl->meta );
     389             : 
     390           0 :   TEST( fd_vinyl_data_init( vinyl->data, _obj, vinyl->obj_footprint, obj_laddr0 )==vinyl->data );
     391           0 :   fd_vinyl_data_reset( NULL, 0UL, 0UL, 0, vinyl->data );
     392             : 
     393           0 :   fd_vinyl_line_t * line = vinyl->line;
     394           0 :   for( ulong line_idx=0UL; line_idx<line_cnt; line_idx++ ) {
     395           0 :     line[ line_idx ].obj            = NULL;
     396           0 :     line[ line_idx ].ele_idx        = ULONG_MAX;
     397           0 :     line[ line_idx ].ctl            = fd_vinyl_line_ctl( 0UL, 0L);
     398           0 :     line[ line_idx ].line_idx_older = (uint)fd_ulong_if( line_idx!=0UL,          line_idx-1UL, line_cnt-1UL );
     399           0 :     line[ line_idx ].line_idx_newer = (uint)fd_ulong_if( line_idx!=line_cnt-1UL, line_idx+1UL, 0UL          );
     400           0 :   }
     401             : 
     402           0 : # undef TEST
     403             : 
     404           0 :   ulong snapwm_tile_idx = fd_topo_find_tile( topo, "snapwm", 0UL );
     405           0 :   ulong genesi_tile_idx = fd_topo_find_tile( topo, "genesi", 0UL );
     406           0 :   int boot_from_genesis = snapwm_tile_idx==ULONG_MAX;
     407             : 
     408           0 :   if( FD_UNLIKELY( boot_from_genesis ) ) {
     409           0 :     if( FD_UNLIKELY( genesi_tile_idx==ULONG_MAX ) ) {
     410           0 :       FD_LOG_CRIT(( "booting from genesis requires a genesi tile, but none was found in the topology (genesi tile idx %lu)", genesi_tile_idx ));
     411           0 :     }
     412           0 :     if( FD_UNLIKELY( snapwm_tile_idx!=ULONG_MAX ) ) {
     413           0 :       FD_LOG_CRIT(( "booting from genesis with snapshot load tiles idx %lu is not supported", snapwm_tile_idx ));
     414           0 :     }
     415             :     /* When booting from genesis, accdb tile boots immediately, which
     416             :        allows the genesi tile to become a vinyl client. */
     417           0 :     ctx->boot.state_expected = ULONG_MAX;
     418           0 :     ctx->boot.state          = NULL;
     419           0 :     ctx->boot.pair_cnt       = NULL;
     420             :     /* Initialize the bstream sync block: there is no need to keep
     421             :        the return (fd_vinyl_io_t *), but it must be checked. */
     422           0 :     FD_TEST( !!fd_vinyl_io_bd_init( ctx->io_mem, IO_SPAD_MAX, ctx->bstream_fd, 1/*reset*/, "accounts-genesis", 17UL, ctx->boot.from_genesis.io_seed ) );
     423           0 :   } else {
     424           0 :     if( FD_UNLIKELY( snapwm_tile_idx==ULONG_MAX ) ) {
     425           0 :       FD_LOG_CRIT(( "booting with incorrect snapshot load tiles idx snapwm %lu", snapwm_tile_idx ));
     426           0 :     }
     427             :     /* Boot state and expected state */
     428           0 :     fd_topo_tile_t const * snapwm_tile = &topo->tiles[ snapwm_tile_idx ];
     429           0 :     FD_TEST( snapwm_tile->metrics );
     430           0 :     ctx->boot.state_expected = 2;
     431           0 :     ctx->boot.state          = &fd_metrics_tile( snapwm_tile->metrics )[ MIDX( GAUGE, TILE, STATUS ) ];
     432           0 :     ctx->boot.pair_cnt       = &fd_metrics_tile( snapwm_tile->metrics )[ MIDX( GAUGE, SNAPWM, ACCOUNTS_ACTIVE ) ];
     433           0 :   }
     434             : 
     435             :   /* Vinyl limit on the number of pairs the meta map will accept */
     436           0 :   ctx->pair_cnt_limit = tile->accdb.pair_cnt_limit;
     437           0 :   FD_TEST( ctx->pair_cnt_limit!=ULONG_MAX );
     438             : 
     439             :   /* Discover mapped clients */
     440             : 
     441           0 :   ulong burst_free = FD_VINYL_REQ_MAX;
     442           0 :   ulong quota_free = vinyl->line_cnt - 1UL;
     443           0 :   ctx->exec_max = 0UL;
     444             : 
     445           0 :   for( ulong i=0UL; i<(tile->uses_obj_cnt); i++ ) {
     446             : 
     447           0 :     ulong rq_obj_id = tile->uses_obj_id[ i ];
     448           0 :     fd_topo_obj_t const * rq_obj = &topo->objs[ rq_obj_id ];
     449           0 :     if( strcmp( rq_obj->name, "vinyl_rq" ) ) continue;
     450             : 
     451           0 :     if( FD_UNLIKELY( ctx->client_cnt>=FD_VINYL_CLIENT_MAX ) ) {
     452           0 :       FD_LOG_ERR(( "too many vinyl clients (increase FD_VINYL_CLIENT_MAX)" ));
     453           0 :     }
     454             : 
     455           0 :     ulong burst_max       = 1UL;
     456           0 :     ulong link_id         = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.link_id",         rq_obj_id );
     457           0 :     ulong quota_max       = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.quota_max",       rq_obj_id );
     458           0 :     ulong req_pool_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.req_pool_obj_id", rq_obj_id );
     459           0 :     ulong cq_obj_id       = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.cq_obj_id",       rq_obj_id );
     460           0 :     FD_TEST( link_id        !=ULONG_MAX );
     461           0 :     FD_TEST( quota_max      !=ULONG_MAX );
     462           0 :     FD_TEST( req_pool_obj_id!=ULONG_MAX );
     463           0 :     FD_TEST( cq_obj_id      !=ULONG_MAX );
     464             : 
     465           0 :     if( FD_UNLIKELY( burst_max > burst_free ) ) {
     466           0 :       FD_LOG_ERR(( "too large burst_max (increase FD_VINYL_REQ_MAX or decrease burst_max)" ));
     467           0 :     }
     468             : 
     469           0 :     if( FD_UNLIKELY( quota_max > fd_ulong_min( quota_free, FD_VINYL_COMP_QUOTA_MAX ) ) ) {
     470           0 :       FD_LOG_ERR(( "too large quota_max (increase line_cnt (currently %lu, free %lu) or decrease quota_max (currently %lu))",
     471           0 :                    vinyl->line_cnt, quota_free, quota_max ));
     472           0 :     }
     473             : 
     474           0 :     for( ulong client_idx=0UL; client_idx<ctx->client_cnt; client_idx++ ) {
     475           0 :       if( FD_UNLIKELY( ctx->_client[ client_idx ].link_id==link_id ) ) {
     476           0 :         FD_LOG_ERR(( "client already joined with this link_id (%lu)", link_id ));
     477           0 :       }
     478           0 :     }
     479             : 
     480           0 :     fd_topo_obj_t const *  req_pool_obj = &topo->objs[ req_pool_obj_id ];
     481           0 :     fd_topo_wksp_t const * client_wksp  = &topo->workspaces[ req_pool_obj->wksp_id ];
     482             : 
     483           0 :     fd_vinyl_rq_t * rq; FD_TEST( (rq = fd_vinyl_rq_join( fd_topo_obj_laddr( topo, rq_obj_id ) )) );
     484           0 :     fd_vinyl_cq_t * cq; FD_TEST( (cq = fd_vinyl_cq_join( fd_topo_obj_laddr( topo, cq_obj_id ) )) );
     485             : 
     486           0 :     fd_shmem_join_info_t join_info;
     487           0 :     FD_TEST( fd_shmem_join_query_by_join( client_wksp->wksp, &join_info)==0 );
     488           0 :     FD_LOG_INFO(( "registered client %lu: req_gaddr=%s:%lu cq_gaddr=%s:%lu",
     489           0 :                   ctx->client_cnt,
     490           0 :                   fd_wksp_containing( rq )->name, fd_wksp_gaddr_fast( fd_wksp_containing( rq ), rq ),
     491           0 :                   fd_wksp_containing( cq )->name, fd_wksp_gaddr_fast( fd_wksp_containing( cq ), cq ) ));
     492           0 :     ctx->_client[ ctx->client_cnt ] = (fd_vinyl_client_t) {
     493           0 :       .rq        = rq,
     494           0 :       .cq        = cq,
     495           0 :       .burst_max = 1UL,
     496           0 :       .seq       = 0UL,
     497           0 :       .link_id   = link_id,
     498           0 :       .laddr0    = (ulong)join_info.shmem,
     499           0 :       .laddr1    = (ulong)join_info.shmem + join_info.page_cnt*join_info.page_sz,
     500           0 :       .quota_rem = quota_max,
     501           0 :       .quota_max = quota_max
     502           0 :     };
     503           0 :     ctx->client_cnt++;
     504             : 
     505           0 :     quota_free -= quota_max;
     506           0 :     burst_free -= burst_max;
     507             : 
     508             :     /* Every client_cnt run loop iterations we receive at most:
     509             : 
     510             :          sum_clients recv_max = FD_VINYL_RECV_MAX - burst_free
     511             : 
     512             :        requests.  To guarantee we processe requests fast enough
     513             :        that we never overrun our receive queue, under maximum
     514             :        client load, we need to process:
     515             : 
     516             :          sum_clients recv_max / client_cnt
     517             : 
     518             :        requests per run loop iteration.  We thus set exec_max
     519             :        to the ceil sum_clients recv_max / client_cnt. */
     520             : 
     521           0 :     ctx->exec_max = (FD_VINYL_REQ_MAX - burst_free + ctx->client_cnt - 1UL) / ctx->client_cnt;
     522             : 
     523           0 :   } /* client join loop */
     524             : 
     525           0 : }
     526             : 
     527             : /* during_housekeeping is called periodically (approx every STEM_LAZY ns) */
     528             : 
     529             : static void
     530           0 : during_housekeeping( fd_vinyl_tile_t * ctx ) {
     531             : 
     532           0 :   fd_vinyl_t * vinyl = ctx->vinyl;
     533             : 
     534           0 :   if( FD_UNLIKELY( !ctx->booted ) ) {
     535           0 :     if( FD_UNLIKELY( !!ctx->boot.state ) ) {
     536           0 :       ulong const boot_state = FD_VOLATILE_CONST( *ctx->boot.state );
     537           0 :       if( boot_state!=ctx->boot.state_expected ) {
     538           0 :         fd_log_sleep( 1e6 ); /* 1 ms */
     539           0 :         return;
     540           0 :       }
     541           0 :     }
     542             : 
     543           0 :     if( ctx->ring->ioring_fd!=-1 ) {
     544           0 :       vinyl->io = fd_vinyl_io_ur_init( ctx->io_mem, IO_SPAD_MAX, ctx->bstream_fd, ctx->ring );
     545           0 :       if( FD_UNLIKELY( !vinyl->io ) ) FD_LOG_ERR(( "Failed to initialize io_uring I/O backend for account database" ));
     546           0 :     } else {
     547           0 :       vinyl->io = fd_vinyl_io_bd_init( ctx->io_mem, IO_SPAD_MAX, ctx->bstream_fd, 0, NULL, 0UL, 0UL );
     548           0 :       if( FD_UNLIKELY( !vinyl->io ) ) FD_LOG_ERR(( "Failed to initialize blocking I/O backend for account database" ));
     549           0 :     }
     550           0 :     vinyl->pair_cnt = 0UL;
     551           0 :     if( !!ctx->boot.pair_cnt ) {
     552           0 :       vinyl->pair_cnt = FD_VOLATILE_CONST( *ctx->boot.pair_cnt );
     553           0 :     }
     554             : 
     555             :     /* Once snapct tile exits, boot up vinyl */
     556           0 :     FD_LOG_INFO(( "vinyl server starting with %lu active records", vinyl->pair_cnt ));
     557             : 
     558           0 :     ctx->booted = 1;
     559           0 :   }
     560             : 
     561             :   /* If we've written enough to justify appending a parallel
     562             :       recovery partition, append one. */
     563             : 
     564           0 :   ulong seq_future = fd_vinyl_io_seq_future( vinyl->io );
     565           0 :   if( FD_UNLIKELY( (seq_future - ctx->seq_part) > vinyl->part_thresh ) ) {
     566             : 
     567           0 :     ulong seq = fd_vinyl_io_append_part( vinyl->io, ctx->seq_part, ctx->accum_dead_cnt, 0UL, NULL, 0UL );
     568           0 :     FD_CRIT( fd_vinyl_seq_eq( seq, seq_future ), "corruption detected" );
     569           0 :     ctx->seq_part = seq + FD_VINYL_BSTREAM_BLOCK_SZ;
     570             : 
     571           0 :     ctx->accum_dead_cnt = 0UL;
     572             : 
     573           0 :     ctx->accum_garbage_cnt++;
     574           0 :     ctx->accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     575             : 
     576           0 :     fd_vinyl_io_commit( vinyl->io, FD_VINYL_IO_FLAG_BLOCKING );
     577           0 :     FD_MCNT_INC( ACCDB, BLOCKS_PART, 1UL );
     578             : 
     579           0 :   }
     580             : 
     581             :   /* Let the number of items of garbage generated since the last
     582             :      compaction be accum_garbage_cnt and let the steady steady
     583             :      average number of live / garbage items in the bstream's past be
     584             :      L / G (i.e. L is the average value of pair_cnt).  The average
     585             :      number pieces of garbage collected per garbage collection round
     586             :      is thus G / (L + G).  If we do compact_max rounds garbage
     587             :      collection this async handling, we expect to collect
     588             : 
     589             :           compact_max G / (L + G)
     590             : 
     591             :      items of garbage on average.  To make sure we collect garbage
     592             :      faster than we generate it on average, we then require:
     593             : 
     594             :           accum_garbage_cnt <~ compact_max G / (L + G)
     595             :        -> compact_max >~ (L + G) accum_garbage_cnt / G
     596             : 
     597             :      Let the be 2^-gc_eager be the maximum fraction of items in the
     598             :      bstream's past we are willing tolerate as garbage on average.
     599             :      We then have G = 2^-gc_eager (L + G).  This implies:
     600             : 
     601             :        -> compact_max >~ accum_garbage_cnt 2^gc_eager
     602             : 
     603             :      When accum_garbage_cnt is 0, we use a compact_max of 1 to do
     604             :      compaction rounds at a minimum rate all the time.  This allows
     605             :      transients (e.g. a sudden change to new steady state
     606             :      equilibrium, temporary disabling of garbage collection at key
     607             :      times for highest performance, etc) and unaccounted zero
     608             :      padding garbage to be absorbed when nothing else is going on. */
     609             : 
     610           0 :   int gc_eager = vinyl->gc_eager;
     611           0 :   if( FD_LIKELY( gc_eager>=0 ) ) {
     612             : 
     613             :     /* Saturating wide left shift */
     614           0 :     ulong overflow    = (ctx->accum_garbage_cnt >> (63-gc_eager) >> 1); /* sigh ... avoid wide shift UB */
     615           0 :     ulong compact_max = fd_ulong_max( fd_ulong_if( !overflow, ctx->accum_garbage_cnt << gc_eager, ULONG_MAX ), 1UL );
     616             : 
     617             :     /**/                                        ctx->accum_garbage_cnt = 0UL;
     618           0 :     vinyl->garbage_sz += ctx->accum_garbage_sz; ctx->accum_garbage_sz  = 0UL;
     619             : 
     620           0 :     ulong garbage_pre = vinyl->garbage_sz;
     621           0 :     fd_vinyl_compact( vinyl, compact_max );
     622           0 :     FD_MCNT_INC( ACCDB, CUM_GC_BYTES, garbage_pre - vinyl->garbage_sz );
     623             : 
     624           0 :   }
     625             : 
     626             :   /* Update vinyl sync block
     627             :      (Required to reclaim bstream space freed by compaction) */
     628             : 
     629           0 :   long now = fd_log_wallclock();
     630           0 :   if( now >= ctx->sync_next_ns ) {
     631           0 :     ctx->sync_next_ns = now + (long)30e9; /* every 30 seconds */
     632           0 :     fd_vinyl_io_sync( vinyl->io, FD_VINYL_IO_FLAG_BLOCKING );
     633           0 :   }
     634             : 
     635             :   /* Service io_uring instance */
     636             : 
     637           0 :   if( ctx->ring->ioring_fd!=-1 ) {
     638           0 :     uint sq_drops = fd_io_uring_sq_dropped( ctx->ring->sq );
     639           0 :     if( FD_UNLIKELY( sq_drops ) ) {
     640           0 :       FD_LOG_CRIT(( "kernel io_uring dropped I/O requests, cannot continue (sq_dropped=%u)", sq_drops ));
     641           0 :     }
     642             : 
     643           0 :     uint cq_drops = fd_io_uring_cq_overflow( ctx->ring->cq );
     644           0 :     if( FD_UNLIKELY( cq_drops ) ) {
     645           0 :       FD_LOG_CRIT(( "kernel io_uring dropped I/O completions, cannot continue (cq_overflow=%u)", cq_drops ));
     646           0 :     }
     647           0 :   }
     648             : 
     649             :   /* Keep an eye on vinyl meta map utilization (pair_cnt) */
     650           0 :   if( FD_UNLIKELY( vinyl->pair_cnt > ctx->pair_cnt_limit ) ) {;
     651           0 :     FD_LOG_ERR(( "accdb accounts count %lu exceeded limit %lu", vinyl->pair_cnt, ctx->pair_cnt_limit ));
     652           0 :   }
     653           0 : }
     654             : 
     655             : /* If should_shutdown returns non-zero, the vinyl tile is shut down */
     656             : 
     657             : static int
     658           0 : should_shutdown( fd_vinyl_tile_t * ctx ) {
     659           0 :   if( FD_UNLIKELY( !ctx->booted ) ) return 0;
     660           0 :   if( FD_LIKELY( !ctx->shutdown ) ) return 0;
     661             : 
     662           0 :   fd_vinyl_t *    vinyl = ctx->vinyl;
     663           0 :   fd_vinyl_io_t * io    = vinyl->io;
     664             : 
     665           0 :   ulong discard_cnt = ctx->req_tail - ctx->req_head;
     666             : 
     667             :   /* Append the final partition and sync so we can resume with a fast
     668             :      parallel recovery */
     669             : 
     670           0 :   FD_MCNT_INC( ACCDB, BLOCKS_PART, 1UL );
     671           0 :   fd_vinyl_io_append_part( io, ctx->seq_part, ctx->accum_dead_cnt, 0UL, NULL, 0UL );
     672             : 
     673           0 :   ctx->accum_dead_cnt = 0UL;
     674             : 
     675           0 :   ctx->accum_garbage_cnt++;
     676           0 :   ctx->accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     677             : 
     678           0 :   fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
     679             : 
     680           0 :   fd_vinyl_io_sync( io, FD_VINYL_IO_FLAG_BLOCKING );
     681             : 
     682             :   /* Drain outstanding accumulators */
     683             : 
     684             :   /**/                                        ctx->accum_garbage_cnt = 0UL;
     685           0 :   vinyl->garbage_sz += ctx->accum_garbage_sz; ctx->accum_garbage_sz  = 0UL;
     686             : 
     687             :   /* Disconnect from the clients */
     688             : 
     689           0 :   ulong released_cnt = 0UL;
     690           0 :   for( ulong client_idx=0UL; client_idx<ctx->client_cnt; client_idx++ ) {
     691           0 :     released_cnt += (ctx->_client[ client_idx ].quota_max - ctx->_client[ client_idx ].quota_rem);
     692           0 :   }
     693             : 
     694           0 :   if( FD_UNLIKELY( discard_cnt     ) ) FD_LOG_WARNING(( "halt discarded %lu received requests",   discard_cnt     ));
     695           0 :   if( FD_UNLIKELY( released_cnt    ) ) FD_LOG_WARNING(( "halt released %lu outstanding acquires", released_cnt    ));
     696           0 :   if( FD_UNLIKELY( ctx->client_cnt ) ) FD_LOG_WARNING(( "halt disconneced %lu clients",           ctx->client_cnt ));
     697             : 
     698           0 :   return 1;
     699           0 : }
     700             : 
     701             : static void
     702           0 : metrics_write( fd_vinyl_tile_t * ctx ) {
     703           0 :   if( FD_UNLIKELY( !ctx->booted ) ) return;
     704           0 :   fd_vinyl_t *    vinyl = ctx->vinyl;
     705           0 :   fd_vinyl_io_t * io    = vinyl->io;
     706             : 
     707           0 :   ulong vinyl__pair_cnt_left = fd_ulong_sat_sub( ctx->pair_cnt_limit, vinyl->pair_cnt );
     708           0 :   FD_MGAUGE_SET( ACCDB, ACCOUNTS,                     vinyl->pair_cnt      );
     709           0 :   FD_MGAUGE_SET( ACCDB, ACCOUNT_INDEX_REMAINING_FREE, vinyl__pair_cnt_left );
     710             : 
     711           0 :   FD_MCNT_SET( ACCDB, READ_OPS_IO_CACHE,    io->cache_read_cnt     );
     712           0 :   FD_MCNT_SET( ACCDB, READ_BYTES_IO_CACHE,  io->cache_read_tot_sz  );
     713           0 :   FD_MCNT_SET( ACCDB, WRITE_OPS_IO_CACHE,   io->cache_write_cnt    );
     714           0 :   FD_MCNT_SET( ACCDB, WRITE_BYTES_IO_CACHE, io->cache_write_tot_sz );
     715           0 :   FD_MCNT_SET( ACCDB, READ_OPS_FILE,        io->file_read_cnt      );
     716           0 :   FD_MCNT_SET( ACCDB, READ_BYTES_FILE,      io->file_read_tot_sz   );
     717           0 :   FD_MCNT_SET( ACCDB, WRITE_OPS_FILE,       io->file_write_cnt     );
     718           0 :   FD_MCNT_SET( ACCDB, WRITE_BYTES_FILE,     io->file_write_tot_sz  );
     719             : 
     720           0 :   FD_MGAUGE_SET( ACCDB, FILE_CAPACITY_BYTES, ctx->bstream_file_sz );
     721           0 :   FD_MGAUGE_SET( ACCDB, FILE_USED_BYTES,     io->seq_future - io->seq_ancient );
     722             : 
     723           0 :   FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_ANCIENT, io->seq_ancient );
     724           0 :   FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_PAST,    io->seq_past    );
     725           0 :   FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_PRESENT, io->seq_present );
     726           0 :   FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_FUTURE,  io->seq_future  );
     727             : 
     728           0 :   FD_MGAUGE_SET( ACCDB, GARBAGE_BYTES, vinyl->garbage_sz );
     729           0 : }
     730             : 
     731             : /* before_credit runs every main loop iteration */
     732             : 
     733             : static void
     734             : before_credit( fd_vinyl_tile_t *   ctx,
     735             :                fd_stem_context_t * stem,
     736           0 :                int *               charge_busy ) {
     737           0 :   (void)stem;
     738           0 :   if( FD_UNLIKELY( !ctx->booted ) ) return;
     739             : 
     740           0 :   fd_vinyl_t * vinyl = ctx->vinyl;
     741             : 
     742           0 :   fd_vinyl_io_t *   io   = vinyl->io;
     743           0 :   fd_vinyl_meta_t * meta = vinyl->meta;
     744           0 :   fd_vinyl_line_t * line = vinyl->line;
     745           0 :   fd_vinyl_data_t * data = vinyl->data;
     746             : 
     747           0 :   ulong pair_max = vinyl->pair_max;
     748             : 
     749           0 :   fd_vinyl_meta_ele_t * ele0       = meta->ele;
     750           0 :   ulong                 ele_max    = meta->ele_max;
     751           0 :   ulong                 meta_seed  = meta->seed;
     752           0 :   ulong *               lock       = meta->lock;
     753           0 :   int                   lock_shift = meta->lock_shift;
     754             : 
     755           0 :   ulong                       data_laddr0 = (ulong)data->laddr0;
     756           0 :   fd_vinyl_data_vol_t const * vol         =        data->vol;
     757           0 :   ulong                       vol_cnt     =        data->vol_cnt;
     758             : 
     759           0 :   ulong line_cnt  = vinyl->line_cnt;
     760             : 
     761             :   /* Select client to poll this run loop iteration */
     762             : 
     763           0 :   ctx->client_idx = fd_ulong_if( ctx->client_idx+1UL<ctx->client_cnt, ctx->client_idx+1UL, 0UL );
     764             : 
     765           0 :   fd_vinyl_client_t * client = ctx->_client + ctx->client_idx;
     766             : 
     767           0 :   fd_vinyl_rq_t * rq        = client->rq;
     768           0 :   ulong           seq       = client->seq;
     769           0 :   ulong           burst_max = client->burst_max;
     770           0 :   ulong           link_id   = client->link_id;
     771             : 
     772           0 :   ulong accum_dead_cnt    = ctx->accum_dead_cnt;
     773           0 :   ulong accum_garbage_cnt = ctx->accum_garbage_cnt;
     774           0 :   ulong accum_garbage_sz  = ctx->accum_garbage_sz;
     775             : 
     776             :   /* Enqueue up to burst_max requests from this client into the
     777             :      local request queue.  Using burst_max << FD_VINYL_REQ_MAX
     778             :      allows applications to prevent a bursty client from starving
     779             :      other clients of resources while preserving the spatial and
     780             :      temporal locality of reasonably sized O(burst_max) bursts from
     781             :      an individual client in processing below.  Each run loop
     782             :      iteration can enqueue up to burst_max requests per iterations. */
     783             : 
     784           0 :   for( ulong recv_rem=fd_ulong_min( FD_VINYL_REQ_MAX-(ctx->req_tail-ctx->req_head), burst_max ); recv_rem; recv_rem-- ) {
     785           0 :     fd_vinyl_req_t * req = ctx->_req + (ctx->req_tail & (FD_VINYL_REQ_MAX-1UL));
     786             : 
     787           0 :     long diff = fd_vinyl_rq_recv( rq, seq, req );
     788             : 
     789           0 :     if( FD_LIKELY( diff>0L ) ) break; /* No requests waiting in rq at this time */
     790             : 
     791           0 :     if( FD_UNLIKELY( diff ) ) FD_LOG_CRIT(( "client overran request queue" ));
     792             : 
     793           0 :     *charge_busy = 1;
     794           0 :     seq++;
     795             : 
     796             :     /* We got the next request.  Decide if we should accept it.
     797             : 
     798             :        Specifically, we ignore requests whose link_id don't match
     799             :        link_id (e.g. an unknown link_id or matches a different
     800             :        client's link_id ... don't know if it is where or even if it
     801             :        is safe to the completion).  Even if the request provided an
     802             :        out-of-band location to send the completion (comp_gaddr!=0),
     803             :        we have no reason to trust it given the mismatch.
     804             : 
     805             :        This also gives a mechanism for a client use a single rq to
     806             :        send requests to multiple vinyl instances ... the client
     807             :        should use a different link_id for each vinyl instance.  Each
     808             :        vinyl instance will quickly filter out the requests not
     809             :        addressed to it.
     810             : 
     811             :        Since we know the client_idx at this point, given a matching
     812             :        link_id, we stash the client_idx in the pending req link_id
     813             :        to eliminate the need to maintain a link_id<>client_idx map
     814             :        in the execution loop below. */
     815             : 
     816           0 :     if( FD_UNLIKELY( req->link_id!=link_id ) ) {
     817           0 :       FD_LOG_CRIT(( "received request from link_id %lu, but request specifies incorrect link_id %lu",
     818           0 :                     link_id, req->link_id ));
     819           0 :     }
     820             : 
     821           0 :     req->link_id = ctx->client_idx;
     822             : 
     823           0 :     ctx->req_tail++;
     824           0 :   }
     825             : 
     826           0 :   client->seq = seq;
     827             : 
     828             :   /* Execute received requests */
     829             : 
     830           0 :   for( ulong exec_rem=fd_ulong_min( ctx->req_tail-ctx->req_head, ctx->exec_max ); exec_rem; exec_rem-- ) {
     831           0 :     fd_vinyl_req_t * req = ctx->_req + ((ctx->req_head++) & (FD_VINYL_REQ_MAX-1UL));
     832             : 
     833             :     /* Determine the client that sent this request and unpack the
     834             :        completion fields.  We ignore requests with non-NULL but
     835             :        unmappable out-of-band completion because we can't send the
     836             :        completion in the expected manner and, in lieu of that, the
     837             :        receivers aren't expecting any completion to come via the cq
     838             :        (if any).  Note that this implies requests that don't produce a
     839             :        completion (e.g. FETCH and FLUSH) need to either provide NULL
     840             :        or a valid non-NULL location for comp_gaddr to pass this
     841             :        validation (this is not a burden practically). */
     842             : 
     843           0 :     ulong  req_id     =        req->req_id;
     844           0 :     ulong  client_idx =        req->link_id; /* See note above about link_id / client_idx conversion */
     845           0 :     ulong  batch_cnt  = (ulong)req->batch_cnt;
     846           0 :     ulong  comp_gaddr =        req->comp_gaddr;
     847             : 
     848           0 :     fd_vinyl_client_t * client = ctx->_client + client_idx;
     849             : 
     850           0 :     fd_vinyl_cq_t * cq            = client->cq;
     851           0 :     ulong           link_id       = client->link_id;
     852           0 :     ulong           client_laddr0 = client->laddr0;
     853           0 :     ulong           client_laddr1 = client->laddr1;
     854           0 :     ulong           quota_rem     = client->quota_rem;
     855             : 
     856           0 :     FD_CRIT( quota_rem<=client->quota_max, "corruption detected" );
     857             : 
     858           0 :     fd_vinyl_comp_t * comp = MAP_REQ_GADDR( comp_gaddr, fd_vinyl_comp_t, 1UL );
     859           0 :     if( FD_UNLIKELY( (!comp) & (!!comp_gaddr) ) ) {
     860           0 :       FD_LOG_CRIT(( "client with link_id=%lu requested completion at invalid gaddr %lu",
     861           0 :                     link_id, comp_gaddr ));
     862           0 :     }
     863             : 
     864           0 :     int   comp_err   = 1;
     865           0 :     ulong fail_cnt   = 0UL;
     866             : 
     867           0 :     ulong read_cnt   = 0UL;
     868           0 :     ulong append_cnt = 0UL;
     869             : 
     870           0 :     ulong accum_cache_hit = 0UL;
     871           0 :     switch( req->type ) {
     872             : 
     873           0 : #   include "../../vinyl/fd_vinyl_case_acquire.c"
     874           0 : #   include "../../vinyl/fd_vinyl_case_release.c"
     875           0 : #   include "../../vinyl/fd_vinyl_case_erase.c"
     876             :     /* FIXME support more request types */
     877             : 
     878           0 :     default:
     879           0 :       FD_LOG_CRIT(( "unsupported request type %u", (uint)req->type ));
     880           0 :       comp_err = FD_VINYL_ERR_INVAL;
     881           0 :       break;
     882           0 :     }
     883             : 
     884           0 :     FD_MCNT_INC( ACCDB, REQUEST_BATCHES, 1UL );
     885           0 :     switch( req->type ) {
     886           0 :     case FD_VINYL_REQ_TYPE_ACQUIRE:
     887           0 :       FD_MCNT_INC( ACCDB, REQUESTS_ACQUIRE,      batch_cnt       );
     888           0 :       FD_MCNT_INC( ACCDB, READ_OPS_SHARED_CACHE, accum_cache_hit );
     889           0 :       break;
     890           0 :     case FD_VINYL_REQ_TYPE_RELEASE:
     891             :       /* FIXME missing metrics:
     892             :          - ReadBytes(SharedCache)
     893             :          - WriteOps(SharedCache)
     894             :          - WriteBytes(SharedCache) */
     895           0 :       FD_MCNT_INC( ACCDB, REQUESTS_RELEASE, batch_cnt );
     896           0 :       break;
     897           0 :     case FD_VINYL_REQ_TYPE_ERASE:
     898           0 :       FD_MCNT_INC( ACCDB, REQUESTS_ERASE,   batch_cnt );
     899           0 :       break;
     900           0 :     }
     901             : 
     902           0 :     for( ; read_cnt; read_cnt-- ) {
     903           0 :       fd_vinyl_io_rd_t * _rd; /* avoid pointer escape */
     904           0 :       fd_vinyl_io_poll( io, &_rd, FD_VINYL_IO_FLAG_BLOCKING );
     905           0 :       fd_vinyl_io_rd_t * rd = _rd;
     906             : 
     907           0 :       fd_vinyl_data_obj_t *     obj      = (fd_vinyl_data_obj_t *)    rd->ctx;
     908           0 :       ulong                     seq      =                            rd->seq;
     909           0 :       fd_vinyl_bstream_phdr_t * cphdr    = (fd_vinyl_bstream_phdr_t *)rd->dst;
     910           0 :       ulong                     cpair_sz =                            rd->sz;  (void)cpair_sz;
     911             : 
     912           0 :       fd_vinyl_data_obj_t * cobj = (fd_vinyl_data_obj_t *)fd_ulong_align_dn( (ulong)rd, FD_VINYL_BSTREAM_BLOCK_SZ );
     913             : 
     914           0 :       FD_CRIT( cphdr==fd_vinyl_data_obj_phdr( cobj ), "corruption detected" );
     915             : 
     916           0 :       ulong cpair_ctl = cphdr->ctl;
     917             : 
     918           0 :       int   cpair_type    = fd_vinyl_bstream_ctl_type ( cpair_ctl );
     919           0 :       int   cpair_style   = fd_vinyl_bstream_ctl_style( cpair_ctl );
     920           0 :       ulong cpair_val_esz = fd_vinyl_bstream_ctl_sz   ( cpair_ctl );
     921             : 
     922           0 :       FD_CRIT( cpair_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR,            "corruption detected" );
     923           0 :       FD_CRIT( cpair_sz  ==fd_vinyl_bstream_pair_sz( cpair_val_esz ), "corruption detected" );
     924             : 
     925           0 :       schar * rd_err = cobj->rd_err;
     926             : 
     927           0 :       FD_CRIT ( rd_err,                                          "corruption detected" );
     928           0 :       FD_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
     929             : 
     930           0 :       ulong line_idx = obj->line_idx;
     931             : 
     932           0 :       FD_CRIT( line_idx<line_cnt,                 "corruption detected" );
     933           0 :       FD_CRIT( line[ line_idx ].obj==obj,         "corruption detected" );
     934             : 
     935           0 :       ulong ele_idx = line[ line_idx ].ele_idx;
     936             : 
     937           0 :       FD_CRIT ( ele_idx<ele_max,                                                          "corruption detected" );
     938           0 :       FD_ALERT( !memcmp( &ele0[ ele_idx ].phdr, cphdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
     939           0 :       FD_CRIT ( ele0[ ele_idx ].seq     ==seq,                                            "corruption detected" );
     940           0 :       FD_CRIT ( ele0[ ele_idx ].line_idx==line_idx,                                       "corruption detected" );
     941             : 
     942             :       /* Decode the pair */
     943             : 
     944           0 :       char * val    = (char *)fd_vinyl_data_obj_val( obj );
     945           0 :       ulong  val_sz = (ulong)cphdr->info.val_sz;
     946             : 
     947           0 :       FD_CRIT( val_sz <= FD_VINYL_VAL_MAX,                 "corruption detected" );
     948           0 :       FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
     949             : 
     950           0 :       if( FD_LIKELY( cpair_style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
     951             : 
     952           0 :         FD_CRIT( obj==cobj,             "corruption detected" );
     953           0 :         FD_CRIT( cpair_val_esz==val_sz, "corruption detected" );
     954             : 
     955           0 :       } else if( cpair_style==FD_VINYL_BSTREAM_CTL_STYLE_LZ4 ) {
     956             : 
     957           0 :         char const * cval    = (char const *)fd_vinyl_data_obj_val( cobj );
     958           0 :         ulong        cval_sz = fd_vinyl_bstream_ctl_sz( cpair_ctl );
     959             : 
     960           0 :         ulong _val_sz = (ulong)LZ4_decompress_safe( cval, val, (int)cval_sz, (int)val_sz );
     961           0 :         if( FD_UNLIKELY( _val_sz!=val_sz ) ) FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
     962             : 
     963           0 :         fd_vinyl_data_free( data, cobj );
     964             : 
     965           0 :         fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
     966             : 
     967           0 :         phdr->ctl  = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
     968           0 :         phdr->key  = cphdr->key;
     969           0 :         phdr->info = cphdr->info;
     970             : 
     971           0 :       } else {
     972           0 :         FD_LOG_CRIT(( "corrupt bstream record (seq=%lu cpair_style=%d)", seq, cpair_style ));
     973           0 :       }
     974             : 
     975           0 :       obj->rd_active = (short)0;
     976             : 
     977             :       /* Fill any trailing region with zeros (there is at least
     978             :          FD_VINYL_BSTREAM_FTR_SZ) and tell the client the item was
     979             :          successfully processed. */
     980             : 
     981           0 :       memset( val + val_sz, 0, fd_vinyl_data_szc_obj_footprint( (ulong)obj->szc )
     982           0 :                                - (sizeof(fd_vinyl_data_obj_t) + sizeof(fd_vinyl_bstream_phdr_t) + val_sz) );
     983             : 
     984           0 :       FD_COMPILER_MFENCE();
     985           0 :       *rd_err = (schar)FD_VINYL_SUCCESS;
     986           0 :       FD_COMPILER_MFENCE();
     987             : 
     988           0 :     }
     989             : 
     990           0 :     if( FD_UNLIKELY( append_cnt ) ) fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
     991             : 
     992           0 :     if( FD_LIKELY( comp_err<=0 ) ) fd_vinyl_cq_send( cq, comp, req_id, link_id, comp_err, batch_cnt, fail_cnt, quota_rem );
     993             : 
     994           0 :     client->quota_rem = quota_rem;
     995             : 
     996             :     /* Update metrics.  Derive counters from vinyl locals
     997             : 
     998             :       append_cnt is incremented in these places:
     999             :       - fd_vinyl_case_erase.c   (fd_vinyl_io_append_dead, with accum_dead_cnt)
    1000             :       - fd_vinyl_case_move.c    (fd_vinyl_io_append_move, with accum_move_cnt)
    1001             :       - fd_vinyl_case_move.c    (fd_vinyl_io_append(pair))
    1002             :       - fd_vinyl_case_release.c (fd_vinyl_io_append_pair_inplace)
    1003             :       - fd_vinyl_case_release.c (fd_vinyl_io_append_dead, with accum_dead_cnt)
    1004             : 
    1005             :       We can thus infer the number of pair blocks appended by
    1006             :       subtracting accum_* */
    1007             : 
    1008           0 :     ulong const dead_cnt = accum_dead_cnt - ctx->accum_dead_cnt;
    1009           0 :     FD_MCNT_INC( ACCDB, BLOCKS_PAIR, append_cnt - dead_cnt );
    1010           0 :     FD_MCNT_INC( ACCDB, BLOCKS_DEAD, dead_cnt );
    1011             : 
    1012           0 :   }
    1013             : 
    1014           0 :   ctx->accum_dead_cnt    = accum_dead_cnt;
    1015           0 :   ctx->accum_garbage_cnt = accum_garbage_cnt;
    1016           0 :   ctx->accum_garbage_sz  = accum_garbage_sz;
    1017           0 : }
    1018             : 
    1019           0 : #define STEM_BURST (1UL)
    1020           0 : #define STEM_LAZY  (10000) /* housekeep every 10 us */
    1021           0 : #define STEM_CALLBACK_CONTEXT_TYPE        fd_vinyl_tile_t
    1022           0 : #define STEM_CALLBACK_CONTEXT_ALIGN       fd_vinyl_align()
    1023           0 : #define STEM_CALLBACK_BEFORE_CREDIT       before_credit
    1024           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
    1025           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    1026             : #define STEM_CALLBACK_SHOULD_SHUTDOWN     should_shutdown
    1027             : 
    1028             : #include "../../disco/stem/fd_stem.c"
    1029             : 
    1030             : fd_topo_run_tile_t fd_tile_vinyl = {
    1031             :   .name                     = NAME,
    1032             :   .populate_allowed_fds     = populate_allowed_fds,
    1033             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1034             :   .scratch_align            = scratch_align,
    1035             :   .scratch_footprint        = scratch_footprint,
    1036             :   .privileged_init          = privileged_init,
    1037             :   .unprivileged_init        = unprivileged_init,
    1038             :   .run                      = stem_run,
    1039             : 
    1040             :   /* Depending on kernel version and file system, io_uring might spawn
    1041             :      kthreads to do write I/O.  Unless we set this, fd_sandbox sets
    1042             :      RLIMIT_NPROC to zero, which fails io_arm_poll_handler, which
    1043             :      bubbles up as an ECANCELED. */
    1044             :   .rlimit_nproc = 8UL
    1045             : };
    1046             : 
    1047             : #undef NAME

Generated by: LCOV version 1.14