Line data Source code
1 : /* Vinyl database server (Firedancer adaptation)
2 :
3 : This implementation is a fork of src/vinyl/fd_vinyl_exec with some
4 : Firedancer-specific changes:
5 : - All clients are joined on startup
6 : - Some errors (invalid link_id, invalid comp_gaddr) result in hard
7 : crashes instead of silent drops
8 : - Sandboxing */
9 :
10 : #define _GNU_SOURCE
11 : #include "../../disco/topo/fd_topo.h"
12 : #include "../../disco/metrics/fd_metrics.h"
13 : #include "../../discof/restore/fd_snapct_tile.h"
14 : #include "../../vinyl/fd_vinyl.h"
15 : #include "../../vinyl/fd_vinyl_base.h"
16 : #include "../../vinyl/io/ur/fd_vinyl_io_ur.h"
17 : #include "../../util/pod/fd_pod_format.h"
18 : #include "../../util/io_uring/fd_io_uring_setup.h"
19 : #include "../../util/io_uring/fd_io_uring_register.h"
20 : #include "generated/fd_accdb_tile_seccomp.h"
21 :
22 : #include <errno.h>
23 : #include <fcntl.h>
24 : #include <lz4.h>
25 : #include <sys/stat.h>
26 : #include <linux/io_uring.h>
27 :
28 : #define NAME "accdb"
29 : #define MAX_INS 8
30 :
31 : /* For io_ur backend, this controls the size of the write-back cache.
32 : This should be larger than the cumulative record size of all unique
33 : changed accounts in a slot. */
34 0 : #define IO_SPAD_MAX (128UL<<20)
35 :
36 : #define FD_VINYL_CLIENT_MAX (1024UL)
37 0 : #define FD_VINYL_REQ_MAX (1024UL)
38 :
39 : struct fd_vinyl_client {
40 : fd_vinyl_rq_t * rq; /* Channel for requests from this client (could be shared by multiple vinyl instances) */
41 : fd_vinyl_cq_t * cq; /* Channel for completions from this client to this vinyl instance
42 : (could be shared by multiple receivers of completions from this vinyl instance). */
43 : ulong burst_max; /* Max requests receive from this client at a time */
44 : ulong seq; /* Sequence number of the next request to receive in the rq */
45 : ulong link_id; /* Identifies requests from this client to this vinyl instance in the rq */
46 : ulong laddr0; /* A valid non-zero gaddr from this client maps to the vinyl instance's laddr laddr0 + gaddr ... */
47 : ulong laddr1; /* ... and thus is in (laddr0,laddr1). A zero gaddr maps to laddr NULL. */
48 : ulong quota_rem; /* Num of remaining acquisitions this client is allowed on this vinyl instance */
49 : ulong quota_max; /* Max quota */
50 : };
51 :
52 : typedef struct fd_vinyl_client fd_vinyl_client_t;
53 :
54 : /* MAP_REQ_GADDR maps a request global address req_gaddr to an array of
55 : cnt T's into the local address space as a T * pointer. If the result
56 : is not properly aligned or the entire range does not completely fall
57 : within the shared region with the client, returns NULL. Likewise,
58 : gaadr 0 maps to NULL. Assumes sizeof(T)*(n) does not overflow (which
59 : is true where as n is at most batch_cnt which is at most 2^32 and
60 : sizeof(T) is at most 40. */
61 :
62 0 : #define MAP_REQ_GADDR( gaddr, T, n ) ((T *)fd_vinyl_laddr( (gaddr), alignof(T), sizeof(T)*(n), client_laddr0, client_laddr1 ))
63 :
64 : FD_FN_CONST static inline void *
65 : fd_vinyl_laddr( ulong req_gaddr,
66 : ulong align,
67 : ulong footprint,
68 : ulong client_laddr0,
69 0 : ulong client_laddr1 ) {
70 0 : ulong req_laddr0 = client_laddr0 + req_gaddr;
71 0 : ulong req_laddr1 = req_laddr0 + footprint;
72 0 : return (void *)fd_ulong_if( (!!req_gaddr) & fd_ulong_is_aligned( req_laddr0, align ) &
73 0 : (client_laddr0<=req_laddr0) & (req_laddr0<=req_laddr1) & (req_laddr1<=client_laddr1),
74 0 : req_laddr0, 0UL );
75 0 : }
76 :
77 : struct fd_vinyl_tile {
78 :
79 : /* Vinyl objects */
80 :
81 : fd_vinyl_t vinyl[1];
82 : void * io_mem;
83 :
84 : /* Tile architecture */
85 :
86 : uint booted : 1;
87 : uint shutdown : 1;
88 : struct {
89 : ulong state_expected;
90 : ulong volatile const * state;
91 : ulong volatile const * pair_cnt;
92 : /* When booting from genesis only */
93 : struct {
94 : ulong io_seed;
95 : } from_genesis;
96 : } boot;
97 :
98 : /* I/O */
99 :
100 : int bstream_fd;
101 : ulong bstream_file_sz;
102 :
103 : /* io_uring */
104 :
105 : fd_io_uring_t ring[1];
106 : void * ioring_shmem; /* shared between kernel and user */
107 :
108 : /* Clients */
109 :
110 : fd_vinyl_client_t _client[ FD_VINYL_CLIENT_MAX ];
111 : ulong client_cnt;
112 : ulong client_idx;
113 :
114 : /* Received requests */
115 :
116 : fd_vinyl_req_t _req[ FD_VINYL_REQ_MAX ];
117 : ulong req_head; /* Requests [0,req_head) have been processed */
118 : ulong req_tail; /* Requests [req_head,req_tail) are pending */
119 : /* Requests [req_tail,ULONG_MAX) have not been received */
120 : ulong exec_max;
121 :
122 : /* accum_dead_cnt is the number of dead blocks that have been
123 : written since the last partition block.
124 :
125 : accum_move_cnt is the number of move blocks that have been
126 : written since this last partition block.
127 :
128 : accum_garbage_cnt / sz is the number of items / bytes garbage in
129 : the bstream that have accumulated since the last time we compacted
130 : the bstream. We use this to estimate the number of rounds of
131 : compaction to do in async handling. */
132 :
133 : ulong accum_dead_cnt;
134 : ulong accum_garbage_cnt;
135 : ulong accum_garbage_sz;
136 :
137 : /* Run loop state */
138 :
139 : ulong seq_part;
140 :
141 : /* Periodic syncing */
142 :
143 : long sync_next_ns;
144 :
145 : /* Vinyl limit on the number of pairs the meta map will accept.
146 : Exceeding this limit will trigger a LOG_ERR. */
147 : ulong pair_cnt_limit;
148 : };
149 :
150 : typedef struct fd_vinyl_tile fd_vinyl_tile_t;
151 :
152 : /* Vinyl state object */
153 :
154 : static ulong
155 0 : scratch_align( void ) {
156 0 : return FD_SHMEM_HUGE_PAGE_SZ;
157 0 : }
158 :
159 : struct fd_accdb_tile_layout {
160 : ulong footprint;
161 : ulong io_off;
162 : ulong io_uring_shmem_off;
163 : ulong vinyl_line_off;
164 : };
165 :
166 : typedef struct fd_accdb_tile_layout fd_accdb_tile_layout_t;
167 :
168 : static void
169 : fd_accdb_tile_layout( fd_accdb_tile_layout_t * layout,
170 0 : fd_topo_tile_t const * tile ) {
171 0 : memset( layout, 0, sizeof(fd_accdb_tile_layout_t) );
172 :
173 0 : FD_SCRATCH_ALLOC_INIT( l, NULL );
174 0 : ulong ctx_off = (ulong)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_vinyl_tile_t), sizeof(fd_vinyl_tile_t) );
175 0 : FD_TEST( ctx_off==0UL );
176 :
177 0 : switch( tile->accdb.io_type ) {
178 0 : case FD_VINYL_IO_TYPE_UR:
179 0 : layout->io_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
180 0 : l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint( IO_SPAD_MAX ) );
181 0 : layout->io_uring_shmem_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
182 0 : l, FD_SHMEM_HUGE_PAGE_SZ, fd_io_uring_shmem_footprint( tile->accdb.uring_depth, tile->accdb.uring_depth ) );
183 0 : break;
184 0 : case FD_VINYL_IO_TYPE_BD:
185 0 : layout->io_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
186 0 : l, fd_vinyl_io_bd_align(), fd_vinyl_io_bd_footprint( IO_SPAD_MAX ) );
187 0 : break;
188 0 : default:
189 0 : FD_LOG_CRIT(( "invalid tile->accdb.io_type %d", tile->accdb.io_type ));
190 0 : }
191 :
192 0 : layout->vinyl_line_off = (ulong)FD_SCRATCH_ALLOC_APPEND(
193 0 : l, alignof(fd_vinyl_line_t), sizeof(fd_vinyl_line_t)*tile->accdb.line_max );
194 0 : layout->footprint = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
195 0 : }
196 :
197 : static ulong
198 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
199 0 : fd_accdb_tile_layout_t layout[1];
200 0 : fd_accdb_tile_layout( layout, tile );
201 0 : return layout->footprint;
202 0 : }
203 :
204 : static ulong
205 : populate_allowed_fds( fd_topo_t const * topo,
206 : fd_topo_tile_t const * tile,
207 : ulong out_fds_cnt,
208 0 : int * out_fds ) {
209 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
210 :
211 0 : ulong out_cnt = 0;
212 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
213 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
214 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
215 0 : }
216 :
217 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
218 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
219 0 : fd_vinyl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_vinyl_tile_t), sizeof(fd_vinyl_tile_t) );
220 :
221 0 : out_fds[ out_cnt++ ] = ctx->bstream_fd;
222 :
223 0 : if( ctx->ring->ioring_fd>=0 ) out_fds[ out_cnt++ ] = ctx->ring->ioring_fd;
224 :
225 0 : return out_cnt;
226 0 : }
227 :
228 : static ulong
229 : populate_allowed_seccomp( fd_topo_t const * topo,
230 : fd_topo_tile_t const * tile,
231 : ulong out_cnt,
232 0 : struct sock_filter * out ) {
233 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
234 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
235 0 : fd_vinyl_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_vinyl_tile_t), sizeof(fd_vinyl_tile_t) );
236 :
237 0 : populate_sock_filter_policy_fd_accdb_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->bstream_fd, (uint)ctx->ring->ioring_fd );
238 0 : return sock_filter_policy_fd_accdb_tile_instr_cnt;
239 0 : }
240 :
241 : static void
242 : vinyl_io_uring_init( fd_vinyl_tile_t * ctx,
243 : uint uring_depth,
244 0 : int dev_fd ) {
245 0 : fd_io_uring_params_t params[1];
246 0 : fd_io_uring_params_init( params, uring_depth );
247 :
248 : /* We busy poll the kernel syscall interface and use GETEVENTS.
249 : Therefore inhibit interrupt-driven completions. */
250 0 : params->flags |= IORING_SETUP_COOP_TASKRUN;
251 0 : params->features |= IORING_SETUP_DEFER_TASKRUN;
252 :
253 0 : if( FD_UNLIKELY( !fd_io_uring_init_shmem( ctx->ring, params, ctx->ioring_shmem, uring_depth, uring_depth ) ) ) {
254 0 : FD_LOG_ERR(( "fd_io_uring_init_shmem failed (%i-%s)", errno, fd_io_strerror( errno ) ));
255 0 : }
256 :
257 0 : if( FD_UNLIKELY( fd_io_uring_register_files( ctx->ring->ioring_fd, &dev_fd, 1 )<0 ) ) {
258 0 : FD_LOG_ERR(( "io_uring_register_files failed (%i-%s)", errno, fd_io_strerror( errno ) ));
259 0 : }
260 :
261 0 : fd_io_uring_restriction_t res[4] = {
262 0 : { .opcode = FD_IORING_RESTRICTION_SQE_OP,
263 0 : .sqe_op = IORING_OP_READ },
264 0 : { .opcode = FD_IORING_RESTRICTION_SQE_OP,
265 0 : .sqe_op = IORING_OP_WRITE },
266 0 : { .opcode = FD_IORING_RESTRICTION_SQE_FLAGS_REQUIRED,
267 0 : .sqe_flags = IOSQE_FIXED_FILE },
268 0 : { .opcode = FD_IORING_RESTRICTION_SQE_FLAGS_ALLOWED,
269 0 : .sqe_flags = 0 }
270 0 : };
271 0 : if( FD_UNLIKELY( fd_io_uring_register_restrictions( ctx->ring->ioring_fd, res, 4U )<0 ) ) {
272 0 : FD_LOG_ERR(( "io_uring_register_restrictions failed (%i-%s)", errno, fd_io_strerror( errno ) ));
273 0 : }
274 :
275 : /* Enable rings */
276 0 : if( FD_UNLIKELY( fd_io_uring_enable_rings( ctx->ring->ioring_fd )<0 ) ) {
277 0 : FD_LOG_ERR(( "io_uring_enable_rings failed (%i-%s)", errno, fd_io_strerror( errno ) ));
278 0 : }
279 0 : }
280 :
281 : static void
282 : privileged_init( fd_topo_t * topo,
283 0 : fd_topo_tile_t * tile ) {
284 0 : ulong line_footprint;
285 0 : if( FD_UNLIKELY( !tile->accdb.line_max || __builtin_umull_overflow( tile->accdb.line_max, sizeof(fd_vinyl_line_t), &line_footprint ) ) ) {
286 0 : FD_LOG_ERR(( "invalid vinyl_line_max %lu", tile->accdb.line_max ));
287 0 : }
288 :
289 0 : fd_vinyl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
290 0 : ulong ctx_laddr = (ulong)ctx;
291 :
292 0 : memset( ctx, 0, sizeof(fd_vinyl_tile_t) );
293 0 : ctx->bstream_fd = -1;
294 0 : ctx->ring->ioring_fd = -1;
295 :
296 0 : fd_accdb_tile_layout_t layout[1];
297 0 : fd_accdb_tile_layout( layout, tile );
298 :
299 0 : fd_vinyl_t * vinyl = ctx->vinyl;
300 0 : ctx->io_mem = (void *)( ctx_laddr + layout->io_off );
301 :
302 0 : if( tile->accdb.io_type==FD_VINYL_IO_TYPE_UR ) {
303 0 : ctx->ioring_shmem = (void *)( ctx_laddr + layout->io_uring_shmem_off );
304 0 : }
305 :
306 0 : fd_vinyl_line_t * _line = (void *)( ctx_laddr + layout->vinyl_line_off );
307 :
308 0 : vinyl->cnc = NULL;
309 0 : vinyl->io = NULL;
310 0 : vinyl->line = (fd_vinyl_line_t *)_line;
311 0 : vinyl->line_footprint = line_footprint;
312 :
313 : /* FIXME use O_DIRECT? */
314 0 : FD_LOG_INFO(( "opening vinyl database file %s", tile->accdb.bstream_path ));
315 0 : int dev_fd = open( tile->accdb.bstream_path, O_RDWR|O_CLOEXEC );
316 0 : if( FD_UNLIKELY( dev_fd<0 ) ) FD_LOG_ERR(( "open(%s,O_RDWR|O_CLOEXEC) failed (%i-%s)", tile->accdb.bstream_path, errno, fd_io_strerror( errno ) ));
317 :
318 0 : ctx->bstream_fd = dev_fd;
319 :
320 0 : struct stat st;
321 0 : if( FD_UNLIKELY( fstat( dev_fd, &st )<0 ) ) {
322 0 : FD_LOG_ERR(( "fstat on bstream fd failed (%i-%s)", errno, fd_io_strerror( errno ) ));
323 0 : }
324 0 : ctx->bstream_file_sz = (ulong)st.st_size;
325 :
326 0 : int io_type = tile->accdb.io_type;
327 0 : if( io_type==FD_VINYL_IO_TYPE_UR ) {
328 0 : vinyl_io_uring_init( ctx, tile->accdb.uring_depth, dev_fd );
329 0 : } else if( io_type!=FD_VINYL_IO_TYPE_BD ) {
330 0 : FD_LOG_ERR(( "Unsupported vinyl io_type %d", io_type ));
331 0 : }
332 :
333 : /* Only needed when booting from genesis */
334 0 : FD_TEST( fd_rng_secure( &ctx->boot.from_genesis.io_seed, 8UL ) );
335 0 : }
336 :
337 : static void
338 : unprivileged_init( fd_topo_t * topo,
339 0 : fd_topo_tile_t * tile ) {
340 :
341 0 : fd_vinyl_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
342 0 : fd_vinyl_t * vinyl = ctx->vinyl;
343 :
344 0 : ctx->sync_next_ns = fd_log_wallclock();
345 :
346 0 : void * _meta = fd_topo_obj_laddr( topo, tile->accdb.meta_map_obj_id );
347 0 : void * _ele = fd_topo_obj_laddr( topo, tile->accdb.meta_pool_obj_id );
348 0 : void * _obj = fd_topo_obj_laddr( topo, tile->accdb.data_obj_id );
349 :
350 0 : # define TEST( c ) do { if( FD_UNLIKELY( !(c) ) ) { FD_LOG_ERR(( "FAIL: %s", #c )); } } while(0)
351 :
352 0 : vinyl->cnc_footprint = 0UL;
353 0 : vinyl->meta_footprint = topo->objs[ tile->accdb.meta_map_obj_id ].footprint;
354 0 : vinyl->ele_footprint = topo->objs[ tile->accdb.meta_pool_obj_id ].footprint;
355 0 : vinyl->obj_footprint = topo->objs[ tile->accdb.data_obj_id ].footprint;
356 :
357 0 : void * obj_laddr0 = fd_wksp_containing( _obj );
358 0 : ulong part_thresh = 64UL<<20;
359 0 : ulong gc_thresh = 128UL<<20;
360 0 : int gc_eager = 2;
361 :
362 0 : ulong ele_max = fd_ulong_pow2_dn( vinyl->ele_footprint / sizeof( fd_vinyl_meta_ele_t ) );
363 :
364 0 : ulong pair_max = ele_max - 1UL;
365 0 : ulong line_cnt = fd_ulong_min( vinyl->line_footprint / sizeof( fd_vinyl_line_t ), pair_max );
366 :
367 0 : TEST( (3UL<=line_cnt) & (line_cnt<=FD_VINYL_LINE_MAX) );
368 :
369 : /* seed is arb */
370 :
371 : /* part_thresh is arb */
372 :
373 : /* gc_thresh is arb */
374 :
375 0 : TEST( (-1<=gc_eager) & (gc_eager<=63) );
376 :
377 0 : vinyl->line_cnt = line_cnt;
378 0 : vinyl->pair_max = pair_max;
379 :
380 0 : vinyl->part_thresh = part_thresh;
381 0 : vinyl->gc_thresh = gc_thresh;
382 0 : vinyl->gc_eager = gc_eager;
383 0 : vinyl->style = FD_VINYL_BSTREAM_CTL_STYLE_RAW;
384 0 : vinyl->line_idx_lru = 0U;
385 0 : vinyl->pair_cnt = 0UL;
386 0 : vinyl->garbage_sz = 0UL;
387 :
388 0 : TEST( fd_vinyl_meta_join( vinyl->meta, _meta, _ele )==vinyl->meta );
389 :
390 0 : TEST( fd_vinyl_data_init( vinyl->data, _obj, vinyl->obj_footprint, obj_laddr0 )==vinyl->data );
391 0 : fd_vinyl_data_reset( NULL, 0UL, 0UL, 0, vinyl->data );
392 :
393 0 : fd_vinyl_line_t * line = vinyl->line;
394 0 : for( ulong line_idx=0UL; line_idx<line_cnt; line_idx++ ) {
395 0 : line[ line_idx ].obj = NULL;
396 0 : line[ line_idx ].ele_idx = ULONG_MAX;
397 0 : line[ line_idx ].ctl = fd_vinyl_line_ctl( 0UL, 0L);
398 0 : line[ line_idx ].line_idx_older = (uint)fd_ulong_if( line_idx!=0UL, line_idx-1UL, line_cnt-1UL );
399 0 : line[ line_idx ].line_idx_newer = (uint)fd_ulong_if( line_idx!=line_cnt-1UL, line_idx+1UL, 0UL );
400 0 : }
401 :
402 0 : # undef TEST
403 :
404 0 : ulong snapwm_tile_idx = fd_topo_find_tile( topo, "snapwm", 0UL );
405 0 : ulong genesi_tile_idx = fd_topo_find_tile( topo, "genesi", 0UL );
406 0 : int boot_from_genesis = snapwm_tile_idx==ULONG_MAX;
407 :
408 0 : if( FD_UNLIKELY( boot_from_genesis ) ) {
409 0 : if( FD_UNLIKELY( genesi_tile_idx==ULONG_MAX ) ) {
410 0 : FD_LOG_CRIT(( "booting from genesis requires a genesi tile, but none was found in the topology (genesi tile idx %lu)", genesi_tile_idx ));
411 0 : }
412 0 : if( FD_UNLIKELY( snapwm_tile_idx!=ULONG_MAX ) ) {
413 0 : FD_LOG_CRIT(( "booting from genesis with snapshot load tiles idx %lu is not supported", snapwm_tile_idx ));
414 0 : }
415 : /* When booting from genesis, accdb tile boots immediately, which
416 : allows the genesi tile to become a vinyl client. */
417 0 : ctx->boot.state_expected = ULONG_MAX;
418 0 : ctx->boot.state = NULL;
419 0 : ctx->boot.pair_cnt = NULL;
420 : /* Initialize the bstream sync block: there is no need to keep
421 : the return (fd_vinyl_io_t *), but it must be checked. */
422 0 : FD_TEST( !!fd_vinyl_io_bd_init( ctx->io_mem, IO_SPAD_MAX, ctx->bstream_fd, 1/*reset*/, "accounts-genesis", 17UL, ctx->boot.from_genesis.io_seed ) );
423 0 : } else {
424 0 : if( FD_UNLIKELY( snapwm_tile_idx==ULONG_MAX ) ) {
425 0 : FD_LOG_CRIT(( "booting with incorrect snapshot load tiles idx snapwm %lu", snapwm_tile_idx ));
426 0 : }
427 : /* Boot state and expected state */
428 0 : fd_topo_tile_t const * snapwm_tile = &topo->tiles[ snapwm_tile_idx ];
429 0 : FD_TEST( snapwm_tile->metrics );
430 0 : ctx->boot.state_expected = 2;
431 0 : ctx->boot.state = &fd_metrics_tile( snapwm_tile->metrics )[ MIDX( GAUGE, TILE, STATUS ) ];
432 0 : ctx->boot.pair_cnt = &fd_metrics_tile( snapwm_tile->metrics )[ MIDX( GAUGE, SNAPWM, ACCOUNTS_ACTIVE ) ];
433 0 : }
434 :
435 : /* Vinyl limit on the number of pairs the meta map will accept */
436 0 : ctx->pair_cnt_limit = tile->accdb.pair_cnt_limit;
437 0 : FD_TEST( ctx->pair_cnt_limit!=ULONG_MAX );
438 :
439 : /* Discover mapped clients */
440 :
441 0 : ulong burst_free = FD_VINYL_REQ_MAX;
442 0 : ulong quota_free = vinyl->line_cnt - 1UL;
443 0 : ctx->exec_max = 0UL;
444 :
445 0 : for( ulong i=0UL; i<(tile->uses_obj_cnt); i++ ) {
446 :
447 0 : ulong rq_obj_id = tile->uses_obj_id[ i ];
448 0 : fd_topo_obj_t const * rq_obj = &topo->objs[ rq_obj_id ];
449 0 : if( strcmp( rq_obj->name, "vinyl_rq" ) ) continue;
450 :
451 0 : if( FD_UNLIKELY( ctx->client_cnt>=FD_VINYL_CLIENT_MAX ) ) {
452 0 : FD_LOG_ERR(( "too many vinyl clients (increase FD_VINYL_CLIENT_MAX)" ));
453 0 : }
454 :
455 0 : ulong burst_max = 1UL;
456 0 : ulong link_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.link_id", rq_obj_id );
457 0 : ulong quota_max = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.quota_max", rq_obj_id );
458 0 : ulong req_pool_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.req_pool_obj_id", rq_obj_id );
459 0 : ulong cq_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "obj.%lu.cq_obj_id", rq_obj_id );
460 0 : FD_TEST( link_id !=ULONG_MAX );
461 0 : FD_TEST( quota_max !=ULONG_MAX );
462 0 : FD_TEST( req_pool_obj_id!=ULONG_MAX );
463 0 : FD_TEST( cq_obj_id !=ULONG_MAX );
464 :
465 0 : if( FD_UNLIKELY( burst_max > burst_free ) ) {
466 0 : FD_LOG_ERR(( "too large burst_max (increase FD_VINYL_REQ_MAX or decrease burst_max)" ));
467 0 : }
468 :
469 0 : if( FD_UNLIKELY( quota_max > fd_ulong_min( quota_free, FD_VINYL_COMP_QUOTA_MAX ) ) ) {
470 0 : FD_LOG_ERR(( "too large quota_max (increase line_cnt (currently %lu, free %lu) or decrease quota_max (currently %lu))",
471 0 : vinyl->line_cnt, quota_free, quota_max ));
472 0 : }
473 :
474 0 : for( ulong client_idx=0UL; client_idx<ctx->client_cnt; client_idx++ ) {
475 0 : if( FD_UNLIKELY( ctx->_client[ client_idx ].link_id==link_id ) ) {
476 0 : FD_LOG_ERR(( "client already joined with this link_id (%lu)", link_id ));
477 0 : }
478 0 : }
479 :
480 0 : fd_topo_obj_t const * req_pool_obj = &topo->objs[ req_pool_obj_id ];
481 0 : fd_topo_wksp_t const * client_wksp = &topo->workspaces[ req_pool_obj->wksp_id ];
482 :
483 0 : fd_vinyl_rq_t * rq; FD_TEST( (rq = fd_vinyl_rq_join( fd_topo_obj_laddr( topo, rq_obj_id ) )) );
484 0 : fd_vinyl_cq_t * cq; FD_TEST( (cq = fd_vinyl_cq_join( fd_topo_obj_laddr( topo, cq_obj_id ) )) );
485 :
486 0 : fd_shmem_join_info_t join_info;
487 0 : FD_TEST( fd_shmem_join_query_by_join( client_wksp->wksp, &join_info)==0 );
488 0 : FD_LOG_INFO(( "registered client %lu: req_gaddr=%s:%lu cq_gaddr=%s:%lu",
489 0 : ctx->client_cnt,
490 0 : fd_wksp_containing( rq )->name, fd_wksp_gaddr_fast( fd_wksp_containing( rq ), rq ),
491 0 : fd_wksp_containing( cq )->name, fd_wksp_gaddr_fast( fd_wksp_containing( cq ), cq ) ));
492 0 : ctx->_client[ ctx->client_cnt ] = (fd_vinyl_client_t) {
493 0 : .rq = rq,
494 0 : .cq = cq,
495 0 : .burst_max = 1UL,
496 0 : .seq = 0UL,
497 0 : .link_id = link_id,
498 0 : .laddr0 = (ulong)join_info.shmem,
499 0 : .laddr1 = (ulong)join_info.shmem + join_info.page_cnt*join_info.page_sz,
500 0 : .quota_rem = quota_max,
501 0 : .quota_max = quota_max
502 0 : };
503 0 : ctx->client_cnt++;
504 :
505 0 : quota_free -= quota_max;
506 0 : burst_free -= burst_max;
507 :
508 : /* Every client_cnt run loop iterations we receive at most:
509 :
510 : sum_clients recv_max = FD_VINYL_RECV_MAX - burst_free
511 :
512 : requests. To guarantee we processe requests fast enough
513 : that we never overrun our receive queue, under maximum
514 : client load, we need to process:
515 :
516 : sum_clients recv_max / client_cnt
517 :
518 : requests per run loop iteration. We thus set exec_max
519 : to the ceil sum_clients recv_max / client_cnt. */
520 :
521 0 : ctx->exec_max = (FD_VINYL_REQ_MAX - burst_free + ctx->client_cnt - 1UL) / ctx->client_cnt;
522 :
523 0 : } /* client join loop */
524 :
525 0 : }
526 :
527 : /* during_housekeeping is called periodically (approx every STEM_LAZY ns) */
528 :
529 : static void
530 0 : during_housekeeping( fd_vinyl_tile_t * ctx ) {
531 :
532 0 : fd_vinyl_t * vinyl = ctx->vinyl;
533 :
534 0 : if( FD_UNLIKELY( !ctx->booted ) ) {
535 0 : if( FD_UNLIKELY( !!ctx->boot.state ) ) {
536 0 : ulong const boot_state = FD_VOLATILE_CONST( *ctx->boot.state );
537 0 : if( boot_state!=ctx->boot.state_expected ) {
538 0 : fd_log_sleep( 1e6 ); /* 1 ms */
539 0 : return;
540 0 : }
541 0 : }
542 :
543 0 : if( ctx->ring->ioring_fd!=-1 ) {
544 0 : vinyl->io = fd_vinyl_io_ur_init( ctx->io_mem, IO_SPAD_MAX, ctx->bstream_fd, ctx->ring );
545 0 : if( FD_UNLIKELY( !vinyl->io ) ) FD_LOG_ERR(( "Failed to initialize io_uring I/O backend for account database" ));
546 0 : } else {
547 0 : vinyl->io = fd_vinyl_io_bd_init( ctx->io_mem, IO_SPAD_MAX, ctx->bstream_fd, 0, NULL, 0UL, 0UL );
548 0 : if( FD_UNLIKELY( !vinyl->io ) ) FD_LOG_ERR(( "Failed to initialize blocking I/O backend for account database" ));
549 0 : }
550 0 : vinyl->pair_cnt = 0UL;
551 0 : if( !!ctx->boot.pair_cnt ) {
552 0 : vinyl->pair_cnt = FD_VOLATILE_CONST( *ctx->boot.pair_cnt );
553 0 : }
554 :
555 : /* Once snapct tile exits, boot up vinyl */
556 0 : FD_LOG_INFO(( "vinyl server starting with %lu active records", vinyl->pair_cnt ));
557 :
558 0 : ctx->booted = 1;
559 0 : }
560 :
561 : /* If we've written enough to justify appending a parallel
562 : recovery partition, append one. */
563 :
564 0 : ulong seq_future = fd_vinyl_io_seq_future( vinyl->io );
565 0 : if( FD_UNLIKELY( (seq_future - ctx->seq_part) > vinyl->part_thresh ) ) {
566 :
567 0 : ulong seq = fd_vinyl_io_append_part( vinyl->io, ctx->seq_part, ctx->accum_dead_cnt, 0UL, NULL, 0UL );
568 0 : FD_CRIT( fd_vinyl_seq_eq( seq, seq_future ), "corruption detected" );
569 0 : ctx->seq_part = seq + FD_VINYL_BSTREAM_BLOCK_SZ;
570 :
571 0 : ctx->accum_dead_cnt = 0UL;
572 :
573 0 : ctx->accum_garbage_cnt++;
574 0 : ctx->accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
575 :
576 0 : fd_vinyl_io_commit( vinyl->io, FD_VINYL_IO_FLAG_BLOCKING );
577 0 : FD_MCNT_INC( ACCDB, BLOCKS_PART, 1UL );
578 :
579 0 : }
580 :
581 : /* Let the number of items of garbage generated since the last
582 : compaction be accum_garbage_cnt and let the steady steady
583 : average number of live / garbage items in the bstream's past be
584 : L / G (i.e. L is the average value of pair_cnt). The average
585 : number pieces of garbage collected per garbage collection round
586 : is thus G / (L + G). If we do compact_max rounds garbage
587 : collection this async handling, we expect to collect
588 :
589 : compact_max G / (L + G)
590 :
591 : items of garbage on average. To make sure we collect garbage
592 : faster than we generate it on average, we then require:
593 :
594 : accum_garbage_cnt <~ compact_max G / (L + G)
595 : -> compact_max >~ (L + G) accum_garbage_cnt / G
596 :
597 : Let the be 2^-gc_eager be the maximum fraction of items in the
598 : bstream's past we are willing tolerate as garbage on average.
599 : We then have G = 2^-gc_eager (L + G). This implies:
600 :
601 : -> compact_max >~ accum_garbage_cnt 2^gc_eager
602 :
603 : When accum_garbage_cnt is 0, we use a compact_max of 1 to do
604 : compaction rounds at a minimum rate all the time. This allows
605 : transients (e.g. a sudden change to new steady state
606 : equilibrium, temporary disabling of garbage collection at key
607 : times for highest performance, etc) and unaccounted zero
608 : padding garbage to be absorbed when nothing else is going on. */
609 :
610 0 : int gc_eager = vinyl->gc_eager;
611 0 : if( FD_LIKELY( gc_eager>=0 ) ) {
612 :
613 : /* Saturating wide left shift */
614 0 : ulong overflow = (ctx->accum_garbage_cnt >> (63-gc_eager) >> 1); /* sigh ... avoid wide shift UB */
615 0 : ulong compact_max = fd_ulong_max( fd_ulong_if( !overflow, ctx->accum_garbage_cnt << gc_eager, ULONG_MAX ), 1UL );
616 :
617 : /**/ ctx->accum_garbage_cnt = 0UL;
618 0 : vinyl->garbage_sz += ctx->accum_garbage_sz; ctx->accum_garbage_sz = 0UL;
619 :
620 0 : ulong garbage_pre = vinyl->garbage_sz;
621 0 : fd_vinyl_compact( vinyl, compact_max );
622 0 : FD_MCNT_INC( ACCDB, CUM_GC_BYTES, garbage_pre - vinyl->garbage_sz );
623 :
624 0 : }
625 :
626 : /* Update vinyl sync block
627 : (Required to reclaim bstream space freed by compaction) */
628 :
629 0 : long now = fd_log_wallclock();
630 0 : if( now >= ctx->sync_next_ns ) {
631 0 : ctx->sync_next_ns = now + (long)30e9; /* every 30 seconds */
632 0 : fd_vinyl_io_sync( vinyl->io, FD_VINYL_IO_FLAG_BLOCKING );
633 0 : }
634 :
635 : /* Service io_uring instance */
636 :
637 0 : if( ctx->ring->ioring_fd!=-1 ) {
638 0 : uint sq_drops = fd_io_uring_sq_dropped( ctx->ring->sq );
639 0 : if( FD_UNLIKELY( sq_drops ) ) {
640 0 : FD_LOG_CRIT(( "kernel io_uring dropped I/O requests, cannot continue (sq_dropped=%u)", sq_drops ));
641 0 : }
642 :
643 0 : uint cq_drops = fd_io_uring_cq_overflow( ctx->ring->cq );
644 0 : if( FD_UNLIKELY( cq_drops ) ) {
645 0 : FD_LOG_CRIT(( "kernel io_uring dropped I/O completions, cannot continue (cq_overflow=%u)", cq_drops ));
646 0 : }
647 0 : }
648 :
649 : /* Keep an eye on vinyl meta map utilization (pair_cnt) */
650 0 : if( FD_UNLIKELY( vinyl->pair_cnt > ctx->pair_cnt_limit ) ) {;
651 0 : FD_LOG_ERR(( "accdb accounts count %lu exceeded limit %lu", vinyl->pair_cnt, ctx->pair_cnt_limit ));
652 0 : }
653 0 : }
654 :
655 : /* If should_shutdown returns non-zero, the vinyl tile is shut down */
656 :
657 : static int
658 0 : should_shutdown( fd_vinyl_tile_t * ctx ) {
659 0 : if( FD_UNLIKELY( !ctx->booted ) ) return 0;
660 0 : if( FD_LIKELY( !ctx->shutdown ) ) return 0;
661 :
662 0 : fd_vinyl_t * vinyl = ctx->vinyl;
663 0 : fd_vinyl_io_t * io = vinyl->io;
664 :
665 0 : ulong discard_cnt = ctx->req_tail - ctx->req_head;
666 :
667 : /* Append the final partition and sync so we can resume with a fast
668 : parallel recovery */
669 :
670 0 : FD_MCNT_INC( ACCDB, BLOCKS_PART, 1UL );
671 0 : fd_vinyl_io_append_part( io, ctx->seq_part, ctx->accum_dead_cnt, 0UL, NULL, 0UL );
672 :
673 0 : ctx->accum_dead_cnt = 0UL;
674 :
675 0 : ctx->accum_garbage_cnt++;
676 0 : ctx->accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
677 :
678 0 : fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
679 :
680 0 : fd_vinyl_io_sync( io, FD_VINYL_IO_FLAG_BLOCKING );
681 :
682 : /* Drain outstanding accumulators */
683 :
684 : /**/ ctx->accum_garbage_cnt = 0UL;
685 0 : vinyl->garbage_sz += ctx->accum_garbage_sz; ctx->accum_garbage_sz = 0UL;
686 :
687 : /* Disconnect from the clients */
688 :
689 0 : ulong released_cnt = 0UL;
690 0 : for( ulong client_idx=0UL; client_idx<ctx->client_cnt; client_idx++ ) {
691 0 : released_cnt += (ctx->_client[ client_idx ].quota_max - ctx->_client[ client_idx ].quota_rem);
692 0 : }
693 :
694 0 : if( FD_UNLIKELY( discard_cnt ) ) FD_LOG_WARNING(( "halt discarded %lu received requests", discard_cnt ));
695 0 : if( FD_UNLIKELY( released_cnt ) ) FD_LOG_WARNING(( "halt released %lu outstanding acquires", released_cnt ));
696 0 : if( FD_UNLIKELY( ctx->client_cnt ) ) FD_LOG_WARNING(( "halt disconneced %lu clients", ctx->client_cnt ));
697 :
698 0 : return 1;
699 0 : }
700 :
701 : static void
702 0 : metrics_write( fd_vinyl_tile_t * ctx ) {
703 0 : if( FD_UNLIKELY( !ctx->booted ) ) return;
704 0 : fd_vinyl_t * vinyl = ctx->vinyl;
705 0 : fd_vinyl_io_t * io = vinyl->io;
706 :
707 0 : ulong vinyl__pair_cnt_left = fd_ulong_sat_sub( ctx->pair_cnt_limit, vinyl->pair_cnt );
708 0 : FD_MGAUGE_SET( ACCDB, ACCOUNTS, vinyl->pair_cnt );
709 0 : FD_MGAUGE_SET( ACCDB, ACCOUNT_INDEX_REMAINING_FREE, vinyl__pair_cnt_left );
710 :
711 0 : FD_MCNT_SET( ACCDB, READ_OPS_IO_CACHE, io->cache_read_cnt );
712 0 : FD_MCNT_SET( ACCDB, READ_BYTES_IO_CACHE, io->cache_read_tot_sz );
713 0 : FD_MCNT_SET( ACCDB, WRITE_OPS_IO_CACHE, io->cache_write_cnt );
714 0 : FD_MCNT_SET( ACCDB, WRITE_BYTES_IO_CACHE, io->cache_write_tot_sz );
715 0 : FD_MCNT_SET( ACCDB, READ_OPS_FILE, io->file_read_cnt );
716 0 : FD_MCNT_SET( ACCDB, READ_BYTES_FILE, io->file_read_tot_sz );
717 0 : FD_MCNT_SET( ACCDB, WRITE_OPS_FILE, io->file_write_cnt );
718 0 : FD_MCNT_SET( ACCDB, WRITE_BYTES_FILE, io->file_write_tot_sz );
719 :
720 0 : FD_MGAUGE_SET( ACCDB, FILE_CAPACITY_BYTES, ctx->bstream_file_sz );
721 0 : FD_MGAUGE_SET( ACCDB, FILE_USED_BYTES, io->seq_future - io->seq_ancient );
722 :
723 0 : FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_ANCIENT, io->seq_ancient );
724 0 : FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_PAST, io->seq_past );
725 0 : FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_PRESENT, io->seq_present );
726 0 : FD_MGAUGE_SET( ACCDB, BSTREAM_SEQ_FUTURE, io->seq_future );
727 :
728 0 : FD_MGAUGE_SET( ACCDB, GARBAGE_BYTES, vinyl->garbage_sz );
729 0 : }
730 :
731 : /* before_credit runs every main loop iteration */
732 :
733 : static void
734 : before_credit( fd_vinyl_tile_t * ctx,
735 : fd_stem_context_t * stem,
736 0 : int * charge_busy ) {
737 0 : (void)stem;
738 0 : if( FD_UNLIKELY( !ctx->booted ) ) return;
739 :
740 0 : fd_vinyl_t * vinyl = ctx->vinyl;
741 :
742 0 : fd_vinyl_io_t * io = vinyl->io;
743 0 : fd_vinyl_meta_t * meta = vinyl->meta;
744 0 : fd_vinyl_line_t * line = vinyl->line;
745 0 : fd_vinyl_data_t * data = vinyl->data;
746 :
747 0 : ulong pair_max = vinyl->pair_max;
748 :
749 0 : fd_vinyl_meta_ele_t * ele0 = meta->ele;
750 0 : ulong ele_max = meta->ele_max;
751 0 : ulong meta_seed = meta->seed;
752 0 : ulong * lock = meta->lock;
753 0 : int lock_shift = meta->lock_shift;
754 :
755 0 : ulong data_laddr0 = (ulong)data->laddr0;
756 0 : fd_vinyl_data_vol_t const * vol = data->vol;
757 0 : ulong vol_cnt = data->vol_cnt;
758 :
759 0 : ulong line_cnt = vinyl->line_cnt;
760 :
761 : /* Select client to poll this run loop iteration */
762 :
763 0 : ctx->client_idx = fd_ulong_if( ctx->client_idx+1UL<ctx->client_cnt, ctx->client_idx+1UL, 0UL );
764 :
765 0 : fd_vinyl_client_t * client = ctx->_client + ctx->client_idx;
766 :
767 0 : fd_vinyl_rq_t * rq = client->rq;
768 0 : ulong seq = client->seq;
769 0 : ulong burst_max = client->burst_max;
770 0 : ulong link_id = client->link_id;
771 :
772 0 : ulong accum_dead_cnt = ctx->accum_dead_cnt;
773 0 : ulong accum_garbage_cnt = ctx->accum_garbage_cnt;
774 0 : ulong accum_garbage_sz = ctx->accum_garbage_sz;
775 :
776 : /* Enqueue up to burst_max requests from this client into the
777 : local request queue. Using burst_max << FD_VINYL_REQ_MAX
778 : allows applications to prevent a bursty client from starving
779 : other clients of resources while preserving the spatial and
780 : temporal locality of reasonably sized O(burst_max) bursts from
781 : an individual client in processing below. Each run loop
782 : iteration can enqueue up to burst_max requests per iterations. */
783 :
784 0 : for( ulong recv_rem=fd_ulong_min( FD_VINYL_REQ_MAX-(ctx->req_tail-ctx->req_head), burst_max ); recv_rem; recv_rem-- ) {
785 0 : fd_vinyl_req_t * req = ctx->_req + (ctx->req_tail & (FD_VINYL_REQ_MAX-1UL));
786 :
787 0 : long diff = fd_vinyl_rq_recv( rq, seq, req );
788 :
789 0 : if( FD_LIKELY( diff>0L ) ) break; /* No requests waiting in rq at this time */
790 :
791 0 : if( FD_UNLIKELY( diff ) ) FD_LOG_CRIT(( "client overran request queue" ));
792 :
793 0 : *charge_busy = 1;
794 0 : seq++;
795 :
796 : /* We got the next request. Decide if we should accept it.
797 :
798 : Specifically, we ignore requests whose link_id don't match
799 : link_id (e.g. an unknown link_id or matches a different
800 : client's link_id ... don't know if it is where or even if it
801 : is safe to the completion). Even if the request provided an
802 : out-of-band location to send the completion (comp_gaddr!=0),
803 : we have no reason to trust it given the mismatch.
804 :
805 : This also gives a mechanism for a client use a single rq to
806 : send requests to multiple vinyl instances ... the client
807 : should use a different link_id for each vinyl instance. Each
808 : vinyl instance will quickly filter out the requests not
809 : addressed to it.
810 :
811 : Since we know the client_idx at this point, given a matching
812 : link_id, we stash the client_idx in the pending req link_id
813 : to eliminate the need to maintain a link_id<>client_idx map
814 : in the execution loop below. */
815 :
816 0 : if( FD_UNLIKELY( req->link_id!=link_id ) ) {
817 0 : FD_LOG_CRIT(( "received request from link_id %lu, but request specifies incorrect link_id %lu",
818 0 : link_id, req->link_id ));
819 0 : }
820 :
821 0 : req->link_id = ctx->client_idx;
822 :
823 0 : ctx->req_tail++;
824 0 : }
825 :
826 0 : client->seq = seq;
827 :
828 : /* Execute received requests */
829 :
830 0 : for( ulong exec_rem=fd_ulong_min( ctx->req_tail-ctx->req_head, ctx->exec_max ); exec_rem; exec_rem-- ) {
831 0 : fd_vinyl_req_t * req = ctx->_req + ((ctx->req_head++) & (FD_VINYL_REQ_MAX-1UL));
832 :
833 : /* Determine the client that sent this request and unpack the
834 : completion fields. We ignore requests with non-NULL but
835 : unmappable out-of-band completion because we can't send the
836 : completion in the expected manner and, in lieu of that, the
837 : receivers aren't expecting any completion to come via the cq
838 : (if any). Note that this implies requests that don't produce a
839 : completion (e.g. FETCH and FLUSH) need to either provide NULL
840 : or a valid non-NULL location for comp_gaddr to pass this
841 : validation (this is not a burden practically). */
842 :
843 0 : ulong req_id = req->req_id;
844 0 : ulong client_idx = req->link_id; /* See note above about link_id / client_idx conversion */
845 0 : ulong batch_cnt = (ulong)req->batch_cnt;
846 0 : ulong comp_gaddr = req->comp_gaddr;
847 :
848 0 : fd_vinyl_client_t * client = ctx->_client + client_idx;
849 :
850 0 : fd_vinyl_cq_t * cq = client->cq;
851 0 : ulong link_id = client->link_id;
852 0 : ulong client_laddr0 = client->laddr0;
853 0 : ulong client_laddr1 = client->laddr1;
854 0 : ulong quota_rem = client->quota_rem;
855 :
856 0 : FD_CRIT( quota_rem<=client->quota_max, "corruption detected" );
857 :
858 0 : fd_vinyl_comp_t * comp = MAP_REQ_GADDR( comp_gaddr, fd_vinyl_comp_t, 1UL );
859 0 : if( FD_UNLIKELY( (!comp) & (!!comp_gaddr) ) ) {
860 0 : FD_LOG_CRIT(( "client with link_id=%lu requested completion at invalid gaddr %lu",
861 0 : link_id, comp_gaddr ));
862 0 : }
863 :
864 0 : int comp_err = 1;
865 0 : ulong fail_cnt = 0UL;
866 :
867 0 : ulong read_cnt = 0UL;
868 0 : ulong append_cnt = 0UL;
869 :
870 0 : ulong accum_cache_hit = 0UL;
871 0 : switch( req->type ) {
872 :
873 0 : # include "../../vinyl/fd_vinyl_case_acquire.c"
874 0 : # include "../../vinyl/fd_vinyl_case_release.c"
875 0 : # include "../../vinyl/fd_vinyl_case_erase.c"
876 : /* FIXME support more request types */
877 :
878 0 : default:
879 0 : FD_LOG_CRIT(( "unsupported request type %u", (uint)req->type ));
880 0 : comp_err = FD_VINYL_ERR_INVAL;
881 0 : break;
882 0 : }
883 :
884 0 : FD_MCNT_INC( ACCDB, REQUEST_BATCHES, 1UL );
885 0 : switch( req->type ) {
886 0 : case FD_VINYL_REQ_TYPE_ACQUIRE:
887 0 : FD_MCNT_INC( ACCDB, REQUESTS_ACQUIRE, batch_cnt );
888 0 : FD_MCNT_INC( ACCDB, READ_OPS_SHARED_CACHE, accum_cache_hit );
889 0 : break;
890 0 : case FD_VINYL_REQ_TYPE_RELEASE:
891 : /* FIXME missing metrics:
892 : - ReadBytes(SharedCache)
893 : - WriteOps(SharedCache)
894 : - WriteBytes(SharedCache) */
895 0 : FD_MCNT_INC( ACCDB, REQUESTS_RELEASE, batch_cnt );
896 0 : break;
897 0 : case FD_VINYL_REQ_TYPE_ERASE:
898 0 : FD_MCNT_INC( ACCDB, REQUESTS_ERASE, batch_cnt );
899 0 : break;
900 0 : }
901 :
902 0 : for( ; read_cnt; read_cnt-- ) {
903 0 : fd_vinyl_io_rd_t * _rd; /* avoid pointer escape */
904 0 : fd_vinyl_io_poll( io, &_rd, FD_VINYL_IO_FLAG_BLOCKING );
905 0 : fd_vinyl_io_rd_t * rd = _rd;
906 :
907 0 : fd_vinyl_data_obj_t * obj = (fd_vinyl_data_obj_t *) rd->ctx;
908 0 : ulong seq = rd->seq;
909 0 : fd_vinyl_bstream_phdr_t * cphdr = (fd_vinyl_bstream_phdr_t *)rd->dst;
910 0 : ulong cpair_sz = rd->sz; (void)cpair_sz;
911 :
912 0 : fd_vinyl_data_obj_t * cobj = (fd_vinyl_data_obj_t *)fd_ulong_align_dn( (ulong)rd, FD_VINYL_BSTREAM_BLOCK_SZ );
913 :
914 0 : FD_CRIT( cphdr==fd_vinyl_data_obj_phdr( cobj ), "corruption detected" );
915 :
916 0 : ulong cpair_ctl = cphdr->ctl;
917 :
918 0 : int cpair_type = fd_vinyl_bstream_ctl_type ( cpair_ctl );
919 0 : int cpair_style = fd_vinyl_bstream_ctl_style( cpair_ctl );
920 0 : ulong cpair_val_esz = fd_vinyl_bstream_ctl_sz ( cpair_ctl );
921 :
922 0 : FD_CRIT( cpair_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR, "corruption detected" );
923 0 : FD_CRIT( cpair_sz ==fd_vinyl_bstream_pair_sz( cpair_val_esz ), "corruption detected" );
924 :
925 0 : schar * rd_err = cobj->rd_err;
926 :
927 0 : FD_CRIT ( rd_err, "corruption detected" );
928 0 : FD_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
929 :
930 0 : ulong line_idx = obj->line_idx;
931 :
932 0 : FD_CRIT( line_idx<line_cnt, "corruption detected" );
933 0 : FD_CRIT( line[ line_idx ].obj==obj, "corruption detected" );
934 :
935 0 : ulong ele_idx = line[ line_idx ].ele_idx;
936 :
937 0 : FD_CRIT ( ele_idx<ele_max, "corruption detected" );
938 0 : FD_ALERT( !memcmp( &ele0[ ele_idx ].phdr, cphdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
939 0 : FD_CRIT ( ele0[ ele_idx ].seq ==seq, "corruption detected" );
940 0 : FD_CRIT ( ele0[ ele_idx ].line_idx==line_idx, "corruption detected" );
941 :
942 : /* Decode the pair */
943 :
944 0 : char * val = (char *)fd_vinyl_data_obj_val( obj );
945 0 : ulong val_sz = (ulong)cphdr->info.val_sz;
946 :
947 0 : FD_CRIT( val_sz <= FD_VINYL_VAL_MAX, "corruption detected" );
948 0 : FD_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
949 :
950 0 : if( FD_LIKELY( cpair_style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
951 :
952 0 : FD_CRIT( obj==cobj, "corruption detected" );
953 0 : FD_CRIT( cpair_val_esz==val_sz, "corruption detected" );
954 :
955 0 : } else if( cpair_style==FD_VINYL_BSTREAM_CTL_STYLE_LZ4 ) {
956 :
957 0 : char const * cval = (char const *)fd_vinyl_data_obj_val( cobj );
958 0 : ulong cval_sz = fd_vinyl_bstream_ctl_sz( cpair_ctl );
959 :
960 0 : ulong _val_sz = (ulong)LZ4_decompress_safe( cval, val, (int)cval_sz, (int)val_sz );
961 0 : if( FD_UNLIKELY( _val_sz!=val_sz ) ) FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
962 :
963 0 : fd_vinyl_data_free( data, cobj );
964 :
965 0 : fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
966 :
967 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
968 0 : phdr->key = cphdr->key;
969 0 : phdr->info = cphdr->info;
970 :
971 0 : } else {
972 0 : FD_LOG_CRIT(( "corrupt bstream record (seq=%lu cpair_style=%d)", seq, cpair_style ));
973 0 : }
974 :
975 0 : obj->rd_active = (short)0;
976 :
977 : /* Fill any trailing region with zeros (there is at least
978 : FD_VINYL_BSTREAM_FTR_SZ) and tell the client the item was
979 : successfully processed. */
980 :
981 0 : memset( val + val_sz, 0, fd_vinyl_data_szc_obj_footprint( (ulong)obj->szc )
982 0 : - (sizeof(fd_vinyl_data_obj_t) + sizeof(fd_vinyl_bstream_phdr_t) + val_sz) );
983 :
984 0 : FD_COMPILER_MFENCE();
985 0 : *rd_err = (schar)FD_VINYL_SUCCESS;
986 0 : FD_COMPILER_MFENCE();
987 :
988 0 : }
989 :
990 0 : if( FD_UNLIKELY( append_cnt ) ) fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
991 :
992 0 : if( FD_LIKELY( comp_err<=0 ) ) fd_vinyl_cq_send( cq, comp, req_id, link_id, comp_err, batch_cnt, fail_cnt, quota_rem );
993 :
994 0 : client->quota_rem = quota_rem;
995 :
996 : /* Update metrics. Derive counters from vinyl locals
997 :
998 : append_cnt is incremented in these places:
999 : - fd_vinyl_case_erase.c (fd_vinyl_io_append_dead, with accum_dead_cnt)
1000 : - fd_vinyl_case_move.c (fd_vinyl_io_append_move, with accum_move_cnt)
1001 : - fd_vinyl_case_move.c (fd_vinyl_io_append(pair))
1002 : - fd_vinyl_case_release.c (fd_vinyl_io_append_pair_inplace)
1003 : - fd_vinyl_case_release.c (fd_vinyl_io_append_dead, with accum_dead_cnt)
1004 :
1005 : We can thus infer the number of pair blocks appended by
1006 : subtracting accum_* */
1007 :
1008 0 : ulong const dead_cnt = accum_dead_cnt - ctx->accum_dead_cnt;
1009 0 : FD_MCNT_INC( ACCDB, BLOCKS_PAIR, append_cnt - dead_cnt );
1010 0 : FD_MCNT_INC( ACCDB, BLOCKS_DEAD, dead_cnt );
1011 :
1012 0 : }
1013 :
1014 0 : ctx->accum_dead_cnt = accum_dead_cnt;
1015 0 : ctx->accum_garbage_cnt = accum_garbage_cnt;
1016 0 : ctx->accum_garbage_sz = accum_garbage_sz;
1017 0 : }
1018 :
1019 0 : #define STEM_BURST (1UL)
1020 0 : #define STEM_LAZY (10000) /* housekeep every 10 us */
1021 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_vinyl_tile_t
1022 0 : #define STEM_CALLBACK_CONTEXT_ALIGN fd_vinyl_align()
1023 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
1024 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1025 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1026 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
1027 :
1028 : #include "../../disco/stem/fd_stem.c"
1029 :
1030 : fd_topo_run_tile_t fd_tile_vinyl = {
1031 : .name = NAME,
1032 : .populate_allowed_fds = populate_allowed_fds,
1033 : .populate_allowed_seccomp = populate_allowed_seccomp,
1034 : .scratch_align = scratch_align,
1035 : .scratch_footprint = scratch_footprint,
1036 : .privileged_init = privileged_init,
1037 : .unprivileged_init = unprivileged_init,
1038 : .run = stem_run,
1039 :
1040 : /* Depending on kernel version and file system, io_uring might spawn
1041 : kthreads to do write I/O. Unless we set this, fd_sandbox sets
1042 : RLIMIT_NPROC to zero, which fails io_arm_poll_handler, which
1043 : bubbles up as an ECANCELED. */
1044 : .rlimit_nproc = 8UL
1045 : };
1046 :
1047 : #undef NAME
|