Line data Source code
1 : #include "../../disco/topo/fd_topo.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 : #include "../../ballet/lthash/fd_lthash_adder.h"
4 : #include "../../util/pod/fd_pod.h"
5 : #include "../../vinyl/io/fd_vinyl_io.h"
6 : #include "../../vinyl/io/ur/fd_vinyl_io_ur_private.h"
7 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
8 : #include "../../util/io_uring/fd_io_uring_setup.h"
9 : #include "../../util/io_uring/fd_io_uring_register.h"
10 : #include "../../util/io_uring/fd_io_uring.h"
11 : #include "generated/fd_snaplh_tile_seccomp.h"
12 :
13 : #include "utils/fd_ssctrl.h"
14 : #include "utils/fd_vinyl_admin.h"
15 :
16 : #include <errno.h>
17 : #include <sys/stat.h> /* fstat */
18 : #include <fcntl.h> /* open */
19 : #include <unistd.h> /* close */
20 :
21 : #include "../../vinyl/io/ur/fd_vinyl_io_ur.h"
22 :
23 : #define NAME "snaplh"
24 :
25 : #define IN_CNT_MAX (2UL)
26 0 : #define IN_KIND_SNAPLV (0UL)
27 0 : #define IN_KIND_SNAPWH (1UL)
28 :
29 : #define VINYL_LTHASH_BLOCK_ALIGN FD_VINYL_BSTREAM_BLOCK_SZ
30 0 : #define VINYL_LTHASH_BLOCK_MAX_SZ (16UL<<20)
31 : FD_STATIC_ASSERT( VINYL_LTHASH_BLOCK_MAX_SZ>(sizeof(fd_snapshot_full_account_t)+FD_VINYL_BSTREAM_BLOCK_SZ+2*VINYL_LTHASH_BLOCK_ALIGN), "VINYL_LTHASH_BLOCK_MAX_SZ" );
32 :
33 0 : #define VINYL_LTHASH_RD_REQ_MAX (32UL)
34 0 : #define VINYL_LTHASH_IORING_DEPTH (2*VINYL_LTHASH_RD_REQ_MAX)
35 :
36 0 : #define VINYL_LTHASH_IO_SPAD_MAX (2<<20UL)
37 :
38 0 : #define VINYL_LTHASH_RD_REQ_FREE (0UL)
39 0 : #define VINYL_LTHASH_RD_REQ_PEND (1UL)
40 0 : #define VINYL_LTHASH_RD_REQ_SENT (2UL)
41 :
42 : struct in_link_private {
43 : fd_wksp_t * wksp;
44 : ulong chunk0;
45 : ulong wmark;
46 : ulong mtu;
47 : void const * base;
48 : ulong * seq_sync; /* fseq->seq[0] */
49 : };
50 : typedef struct in_link_private in_link_t;
51 :
52 : struct out_link_private {
53 : fd_wksp_t * wksp;
54 : ulong chunk0;
55 : ulong wmark;
56 : ulong chunk;
57 : ulong mtu;
58 : };
59 : typedef struct out_link_private out_link_t;
60 :
61 : struct fd_snaplh_tile {
62 : uint state;
63 : int full;
64 :
65 : ulong seed;
66 : ulong lthash_tile_cnt;
67 : ulong lthash_tile_idx;
68 : ulong lthash_tile_add_cnt;
69 : ulong lthash_tile_sub_cnt;
70 : ulong lthash_tile_add_idx;
71 : ulong lthash_tile_sub_idx;
72 : ulong pairs_seen;
73 : ulong lthash_req_seen;
74 :
75 : /* Database params */
76 : ulong const * io_seed;
77 :
78 : fd_lthash_adder_t adder[1];
79 : fd_lthash_adder_t adder_sub[1];
80 : uchar data[FD_RUNTIME_ACC_SZ_MAX];
81 :
82 : fd_lthash_value_t running_lthash;
83 : fd_lthash_value_t running_lthash_sub;
84 : ulong running_capitalization_add;
85 : ulong running_capitalization_sub;
86 :
87 : struct {
88 : int dev_fd;
89 : ulong dev_sz;
90 : ulong dev_base;
91 : void * pair_mem;
92 : void * pair_tmp;
93 :
94 : struct {
95 : fd_vinyl_bstream_phdr_t phdr [VINYL_LTHASH_RD_REQ_MAX];
96 : fd_vinyl_io_rd_t rd_req[VINYL_LTHASH_RD_REQ_MAX];
97 : } pending;
98 : ulong pending_rd_req_cnt;
99 :
100 : fd_vinyl_io_t * io;
101 : fd_vinyl_admin_t * admin;
102 : } vinyl;
103 :
104 : struct {
105 : struct {
106 : ulong accounts_hashed;
107 : } full;
108 :
109 : struct {
110 : ulong accounts_hashed;
111 : } incremental;
112 : } metrics;
113 :
114 : ulong wh_finish_fseq;
115 : ulong wh_last_in_seq;
116 :
117 : in_link_t in[IN_CNT_MAX];
118 : uchar in_kind[IN_CNT_MAX];
119 : out_link_t out;
120 :
121 : int lthash_completion_pending;
122 : int fail_completion_pending;
123 :
124 : /* io_uring setup */
125 :
126 : fd_io_uring_t ioring[1];
127 : int io_uring_enabled;
128 : };
129 :
130 : typedef struct fd_snaplh_tile fd_snaplh_t;
131 :
132 : static inline int
133 0 : should_shutdown( fd_snaplh_t * ctx ) {
134 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
135 0 : }
136 :
137 : static ulong
138 0 : scratch_align( void ) {
139 0 : return alignof(fd_snaplh_t);
140 0 : }
141 :
142 : static ulong
143 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
144 0 : (void)tile;
145 0 : ulong l = FD_LAYOUT_INIT;
146 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
147 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
148 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
149 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
150 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
151 0 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
152 0 : l = FD_LAYOUT_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
153 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaplh_t) );
154 0 : }
155 :
156 : static void
157 0 : metrics_write( fd_snaplh_t * ctx ) {
158 0 : FD_MGAUGE_SET( SNAPLH, FULL_ACCOUNTS_HASHED, ctx->metrics.full.accounts_hashed );
159 0 : FD_MGAUGE_SET( SNAPLH, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
160 0 : FD_MGAUGE_SET( SNAPLH, STATE, (ulong)(ctx->state) );
161 0 : }
162 :
163 : static inline int
164 0 : should_hash_account( fd_snaplh_t * ctx ) {
165 0 : return (ctx->pairs_seen % ctx->lthash_tile_add_cnt)==ctx->lthash_tile_add_idx;
166 0 : }
167 :
168 : static inline int
169 0 : should_process_lthash_request( fd_snaplh_t * ctx ) {
170 0 : return (ctx->lthash_req_seen % ctx->lthash_tile_sub_cnt)==ctx->lthash_tile_sub_idx;
171 0 : }
172 :
173 : static void
174 : streamlined_hash( fd_snaplh_t * restrict ctx,
175 : fd_lthash_adder_t * restrict adder,
176 : fd_lthash_value_t * restrict running_lthash,
177 : uchar const * restrict _pair,
178 0 : int is_add ) {
179 0 : uchar const * pair = _pair;
180 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
181 0 : pair += sizeof(fd_vinyl_bstream_phdr_t);
182 0 : fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
183 0 : pair += sizeof(fd_account_meta_t);
184 0 : uchar const * data = pair;
185 :
186 0 : ulong data_len = meta->dlen;
187 0 : const char * pubkey = phdr->key.c;
188 0 : ulong lamports = meta->lamports;
189 0 : const uchar * owner = meta->owner;
190 0 : uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
191 :
192 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
193 0 : if( FD_UNLIKELY( lamports==0UL ) ) return;
194 :
195 0 : fd_lthash_adder_push_solana_account( adder,
196 0 : running_lthash,
197 0 : pubkey,
198 0 : data,
199 0 : data_len,
200 0 : lamports,
201 0 : executable,
202 0 : owner );
203 :
204 0 : if( is_add ) ctx->running_capitalization_add += lamports;
205 0 : else ctx->running_capitalization_sub += lamports;
206 :
207 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
208 0 : else ctx->metrics.incremental.accounts_hashed++;
209 0 : }
210 :
211 : static void
212 : handle_vinyl_lthash_request_bd( fd_snaplh_t * ctx,
213 : ulong seq,
214 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
215 :
216 : /* The bd version is blocking, therefore ctx->pending is not used. */
217 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
218 :
219 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
220 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
221 :
222 : /* dev_seq shows where the seq is physically located in device. */
223 0 : ulong dev_seq = ( seq + ctx->vinyl.dev_base ) % ctx->vinyl.dev_sz;
224 0 : ulong rd_off = fd_ulong_align_dn( dev_seq, FD_VINYL_BSTREAM_BLOCK_SZ );
225 0 : ulong pair_off = (dev_seq - rd_off);
226 0 : ulong rd_sz = fd_ulong_align_up( pair_off + pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
227 0 : FD_TEST( rd_sz < VINYL_LTHASH_BLOCK_MAX_SZ );
228 :
229 0 : uchar * pair = ((uchar*)ctx->vinyl.pair_mem) + pair_off;
230 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)pair;
231 :
232 0 : for(;;) {
233 0 : ulong sz = rd_sz;
234 0 : ulong rsz = fd_ulong_min( rd_sz, ctx->vinyl.dev_sz - rd_off );
235 0 : uchar * dst = ctx->vinyl.pair_mem;
236 0 : uchar * tmp = ctx->vinyl.pair_tmp;
237 :
238 0 : bd_read( ctx->vinyl.dev_fd, rd_off, dst, rsz );
239 0 : sz -= rsz;
240 0 : if( FD_UNLIKELY( sz ) ) {
241 : /* When the dev wraps around, the dev_base needs to be skipped.
242 : This means: increase the size multiple of the alignment,
243 : read into a temporary buffer, and memcpy into the dst at the
244 : correct offset. */
245 0 : bd_read( ctx->vinyl.dev_fd, 0, tmp, sz + FD_VINYL_BSTREAM_BLOCK_SZ );
246 0 : fd_memcpy( dst + rsz, tmp + ctx->vinyl.dev_base, sz );
247 0 : }
248 :
249 0 : if( FD_LIKELY( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) ) ) {
250 :
251 : /* test bstream pair integrity hashes */
252 0 : int test = !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz );
253 0 : if( FD_LIKELY( test ) ) break;
254 0 : }
255 0 : FD_LOG_WARNING(( "phdr mismatch! this should not happen under bstream_seq, continue ..." ));
256 0 : FD_SPIN_PAUSE();
257 0 : }
258 :
259 0 : streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair, 0 );
260 0 : }
261 :
262 : FD_FN_UNUSED static inline ulong
263 0 : rd_req_ctx_get_idx( ulong rd_req_ctx ) {
264 0 : return ( rd_req_ctx >> 0 ) & ((1UL<<32)-1UL);
265 0 : }
266 :
267 : FD_FN_UNUSED static inline ulong
268 0 : rd_req_ctx_get_status( ulong rd_req_ctx ) {
269 0 : return ( rd_req_ctx >> 32 ) & ((1UL<<32)-1UL);
270 0 : }
271 :
272 : FD_FN_UNUSED static inline void
273 : rd_req_ctx_into_parts( ulong rd_req_ctx,
274 : ulong * idx,
275 0 : ulong * status ) {
276 0 : *idx = rd_req_ctx_get_idx( rd_req_ctx );
277 0 : *status = rd_req_ctx_get_status( rd_req_ctx );
278 0 : }
279 :
280 : FD_FN_UNUSED static inline ulong
281 : rd_req_ctx_from_parts( ulong idx,
282 0 : ulong status ) {
283 0 : return ( idx & ((1UL<<32)-1UL) ) | ( status << 32 );
284 0 : }
285 :
286 : FD_FN_UNUSED static inline ulong
287 : rd_req_ctx_update_status( ulong rd_req_ctx,
288 0 : ulong status ) {
289 0 : return rd_req_ctx_from_parts( rd_req_ctx_get_idx( rd_req_ctx ), status );
290 0 : }
291 :
292 : static void
293 : handle_vinyl_lthash_compute_from_rd_req( fd_snaplh_t * ctx,
294 0 : fd_vinyl_io_rd_t * rd_req ) {
295 0 : ulong idx = rd_req_ctx_get_idx( rd_req->ctx );
296 :
297 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rd_req->dst;
298 0 : fd_vinyl_bstream_phdr_t * acc_hdr = &ctx->vinyl.pending.phdr[ idx ];
299 :
300 : /* test the retrieved header (it must mach the request) */
301 0 : FD_TEST( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) );
302 :
303 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
304 0 : ulong seq = rd_req->seq;
305 0 : uchar * pair = (uchar*)rd_req->dst;
306 0 : ulong pair_sz = rd_req->sz;
307 :
308 : /* test the bstream pair integrity hashes */
309 0 : FD_TEST( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz ) );
310 :
311 0 : streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair, 0 );
312 0 : }
313 :
314 : /* Process next read completion */
315 :
316 : static inline ulong
317 0 : consume_available_cqe( fd_snaplh_t * ctx ) {
318 0 : if( FD_LIKELY( !ctx->vinyl.pending_rd_req_cnt ) ) return 0UL;
319 0 : if( FD_UNLIKELY( !ctx->io_uring_enabled ) ) return 0UL;
320 0 : if( !fd_io_uring_cq_ready( ctx->ioring->cq ) ) return 0UL;
321 :
322 : /* At this point, there is at least one unconsumed CQE */
323 :
324 0 : fd_vinyl_io_rd_t * rd_req = NULL;
325 0 : if( FD_LIKELY( fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, 0/*non blocking*/ )==FD_VINYL_SUCCESS ) ) {
326 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
327 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
328 0 : rd_req->seq = ULONG_MAX;
329 0 : rd_req->sz = 0UL;
330 0 : ctx->vinyl.pending_rd_req_cnt--;
331 0 : return 1UL;
332 0 : }
333 0 : return 0UL;
334 0 : }
335 :
336 : static void
337 : handle_vinyl_lthash_request_ur( fd_snaplh_t * ctx,
338 : ulong seq,
339 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
340 : /* Find a free slot */
341 0 : ulong free_i = ULONG_MAX;
342 0 : if( FD_LIKELY( ctx->vinyl.pending_rd_req_cnt<VINYL_LTHASH_RD_REQ_MAX ) ) {
343 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
344 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
345 0 : if( FD_UNLIKELY( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE ) ) {
346 0 : free_i = i;
347 0 : break;
348 0 : }
349 0 : }
350 0 : } else {
351 0 : fd_vinyl_io_rd_t * rd_req = NULL;
352 0 : fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
353 0 : FD_TEST( rd_req!=NULL );
354 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
355 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
356 0 : rd_req->seq = ULONG_MAX;
357 0 : rd_req->sz = 0UL;
358 0 : free_i = rd_req_ctx_get_idx( rd_req->ctx );
359 0 : ctx->vinyl.pending_rd_req_cnt--;
360 0 : }
361 0 : FD_CRIT( free_i<VINYL_LTHASH_RD_REQ_MAX, "read request free index exceeds max value" );
362 :
363 : /* Populate the empty slot and submit */
364 0 : fd_vinyl_bstream_phdr_t * in_phdr = &ctx->vinyl.pending.phdr[ free_i ];
365 0 : *in_phdr = *acc_hdr;
366 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
367 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
368 :
369 : /* Fixup io addressable range */
370 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
371 0 : io->seq_past = fd_ulong_align_dn( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
372 0 : io->seq_present = fd_ulong_align_up( seq+pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
373 0 : if( io->type==FD_VINYL_IO_TYPE_UR ) {
374 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
375 0 : ur->seq_clean = ur->seq_cache = ur->seq_write = io->seq_present;
376 0 : }
377 :
378 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ free_i ];
379 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_PEND );
380 0 : rd_req->seq = seq;
381 0 : rd_req->sz = pair_sz;
382 0 : fd_vinyl_io_read( ctx->vinyl.io, rd_req );
383 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_SENT );
384 0 : ctx->vinyl.pending_rd_req_cnt++;
385 0 : }
386 :
387 : static void
388 0 : handle_vinyl_lthash_request_ur_consume_all( fd_snaplh_t * ctx ) {
389 0 : while( ctx->vinyl.pending_rd_req_cnt ) {
390 0 : fd_vinyl_io_rd_t * rd_req = NULL;
391 0 : fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
392 0 : FD_TEST( rd_req!=NULL );
393 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
394 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
395 0 : rd_req->seq = ULONG_MAX;
396 0 : rd_req->sz = 0UL;
397 0 : ctx->vinyl.pending_rd_req_cnt--;
398 0 : }
399 0 : FD_CRIT( !ctx->vinyl.pending_rd_req_cnt, "pending read requests count not zero" );
400 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
401 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
402 0 : FD_CRIT( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE, "pending request status is not free" );
403 0 : }
404 0 : }
405 :
406 : static void
407 : handle_lthash_completion( fd_snaplh_t * ctx,
408 0 : fd_stem_context_t * stem ) {
409 0 : if( FD_LIKELY( !ctx->lthash_completion_pending ) ) return;
410 :
411 0 : if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
412 0 : fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
413 0 : fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
414 0 : fd_lthash_sub( &ctx->running_lthash, &ctx->running_lthash_sub );
415 0 : fd_ssctrl_hash_result_t * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
416 0 : fd_memcpy( out->lthash.bytes, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
417 0 : long capitalization_add = fd_long_if( ctx->running_capitalization_add>LONG_MAX, LONG_MAX, (long)ctx->running_capitalization_add );
418 0 : long capitalization_sub = fd_long_if( ctx->running_capitalization_sub>LONG_MAX, LONG_MAX, (long)ctx->running_capitalization_sub );
419 0 : if( FD_UNLIKELY( capitalization_add==LONG_MAX ) ) {
420 0 : FD_LOG_ERR(( "capitalization overflow detected: add=%lu", ctx->running_capitalization_add ));
421 0 : }
422 0 : if( FD_UNLIKELY( capitalization_sub==LONG_MAX ) ) {
423 0 : FD_LOG_ERR(( "capitalization overflow detected: sub=%lu", ctx->running_capitalization_sub ));
424 0 : }
425 0 : out->capitalization = capitalization_add - capitalization_sub;
426 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT_ADD, ctx->out.chunk, sizeof(fd_ssctrl_hash_result_t), 0UL, 0UL, 0UL );
427 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, sizeof(fd_ssctrl_hash_result_t), ctx->out.chunk0, ctx->out.wmark );
428 0 : ctx->lthash_completion_pending = 0;
429 0 : }
430 0 : }
431 :
432 : static void
433 : handle_fail_completion( fd_snaplh_t * ctx,
434 0 : fd_stem_context_t * stem ) {
435 0 : if( FD_LIKELY( !ctx->fail_completion_pending ) ) return;
436 :
437 0 : if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
438 0 : fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
439 0 : fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
440 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
441 0 : ctx->fail_completion_pending = 0;
442 0 : }
443 0 : }
444 :
445 : static void
446 : before_credit( fd_snaplh_t * ctx,
447 : fd_stem_context_t * stem FD_PARAM_UNUSED,
448 0 : int * charge_busy ) {
449 0 : *charge_busy = !!consume_available_cqe( ctx );
450 0 : }
451 :
452 : static void
453 : handle_wh_data_frag( fd_snaplh_t * ctx,
454 : ulong in_idx,
455 : ulong chunk, /* compressed input pointer */
456 : ulong sz_comp, /* compressed input size */
457 0 : fd_stem_context_t * stem ) {
458 0 : FD_CRIT( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH, "incorrect in kind" );
459 :
460 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
461 : /* skip all wh data frags when in error state. */
462 0 : return;
463 0 : }
464 0 : if( FD_UNLIKELY( ctx->fail_completion_pending ) ) {
465 : /* handle_fail_completion may succeed (complete) either when the
466 : control frag that triggers it is received (conditional upon
467 : having no pending wh data frags) or after all wh data frags have
468 : been received and processed. Once the fail control message
469 : is received, the state transitions into idle. */
470 0 : handle_fail_completion( ctx, stem );
471 0 : return;
472 0 : }
473 :
474 0 : uchar const * rem = fd_chunk_to_laddr_const( ctx->in[ in_idx ].base, chunk );
475 0 : ulong rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
476 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
477 0 : FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
478 :
479 0 : while( rem_sz ) {
480 0 : FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
481 :
482 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t *)rem;
483 0 : ulong ctl = phdr->ctl;
484 0 : int ctl_type = fd_vinyl_bstream_ctl_type( ctl );
485 0 : switch( ctl_type ) {
486 :
487 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
488 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
489 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
490 0 : if( FD_LIKELY( should_hash_account( ctx ) ) ) {
491 0 : uchar * pair = ctx->vinyl.pair_mem;
492 0 : fd_memcpy( pair, rem, pair_sz );
493 0 : streamlined_hash( ctx, ctx->adder, &ctx->running_lthash, pair, 1 );
494 0 : }
495 0 : rem += pair_sz;
496 0 : rem_sz -= pair_sz;
497 0 : ctx->pairs_seen++;
498 0 : break;
499 0 : }
500 :
501 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
502 0 : rem += FD_VINYL_BSTREAM_BLOCK_SZ;
503 0 : rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
504 0 : break;
505 0 : }
506 :
507 0 : default:
508 0 : FD_LOG_CRIT(( "unexpected vinyl bstream block ctl %016lx for %s snapshot",
509 0 : ctl, ctx->full?"full":"incremental" ));
510 0 : }
511 0 : }
512 :
513 0 : if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
514 : /* handle_lthash_completion may succeed (complete) either when the
515 : control frag that triggers it is received (conditional upon
516 : having no pending wh data frags) or after all wh data frags have
517 : been received and processed. */
518 0 : handle_lthash_completion( ctx, stem );
519 0 : }
520 0 : }
521 :
522 : static void
523 : handle_lv_data_frag( fd_snaplh_t * ctx,
524 : ulong in_idx,
525 : ulong sig,
526 0 : ulong chunk ) { /* compressed input pointer */
527 :
528 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
529 : /* skip all lv data frags when in error state. */
530 0 : return;
531 0 : }
532 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
533 0 : FD_LOG_ERR(( "received snaplv data frag %s (%lu) in state %s (%lu)",
534 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
535 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
536 0 : return;
537 0 : }
538 :
539 0 : if( FD_LIKELY( should_process_lthash_request( ctx ) ) ) {
540 0 : uchar const * indata = fd_chunk_to_laddr_const( ctx->in[ in_idx ].wksp, chunk );
541 0 : ulong seq;
542 0 : fd_vinyl_bstream_phdr_t acc_hdr[1];
543 0 : memcpy( &seq, indata, sizeof(ulong) );
544 0 : memcpy( acc_hdr, indata + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
545 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
546 0 : handle_vinyl_lthash_request_ur( ctx, seq, acc_hdr );
547 0 : } else {
548 0 : handle_vinyl_lthash_request_bd( ctx, seq, acc_hdr );
549 0 : }
550 0 : }
551 0 : ctx->lthash_req_seen++;
552 0 : }
553 :
554 : static inline ulong
555 : tsorig_tspub_to_fseq( ulong tsorig,
556 0 : ulong tspub ) {
557 0 : return (tspub<<32 ) | tsorig;
558 0 : }
559 :
560 : static void
561 : handle_control_frag( fd_snaplh_t * ctx,
562 : ulong sig,
563 : ulong tsorig,
564 : ulong tspub,
565 0 : fd_stem_context_t * stem ) {
566 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
567 : /* Control messages move along the snapshot load pipeline. Since
568 : error conditions can be triggered by any tile in the pipeline,
569 : it is possible to be in error state and still receive otherwise
570 : valid messages. Only a fail message can revert this. */
571 0 : return;
572 0 : };
573 :
574 0 : switch( sig ) {
575 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
576 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
577 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
578 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
579 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
580 0 : fd_lthash_zero( &ctx->running_lthash );
581 0 : fd_lthash_zero( &ctx->running_lthash_sub );
582 0 : fd_lthash_adder_new( ctx->adder );
583 0 : fd_lthash_adder_new( ctx->adder_sub );
584 0 : ctx->running_capitalization_add = 0UL;
585 0 : ctx->running_capitalization_sub = 0UL;
586 0 : break;
587 0 : }
588 :
589 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
590 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
591 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
592 0 : ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
593 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
594 0 : handle_vinyl_lthash_request_ur_consume_all( ctx );
595 0 : }
596 0 : ctx->lthash_completion_pending = 1;
597 : /* handle_lthash_completion may succeed (complete) either here
598 : (if there are no pending wh data frags) or after all wh data
599 : frags have been received and processed. */
600 0 : handle_lthash_completion( ctx, stem );
601 0 : break;
602 0 : }
603 :
604 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
605 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
606 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
607 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
608 0 : break;
609 0 : }
610 :
611 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
612 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
613 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
614 0 : break;
615 0 : }
616 :
617 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
618 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
619 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
620 0 : ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
621 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
622 0 : handle_vinyl_lthash_request_ur_consume_all( ctx );
623 0 : }
624 0 : ctx->fail_completion_pending = 1;
625 : /* handle_fail_completion may succeed (complete) either here (if
626 : there are no pending wh data frags) or after all wh data frags
627 : have been received and processed. */
628 0 : handle_fail_completion( ctx, stem );
629 0 : break;
630 0 : }
631 :
632 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
633 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
634 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
635 0 : break;
636 :
637 0 : default: {
638 0 : FD_LOG_ERR(( "received unexpected control frag %s (%lu) in state %s (%lu)",
639 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
640 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
641 0 : break;
642 0 : }
643 0 : }
644 0 : }
645 :
646 : static inline int
647 : returnable_frag( fd_snaplh_t * ctx,
648 : ulong in_idx,
649 : ulong seq,
650 : ulong sig,
651 : ulong chunk,
652 : ulong sz,
653 : ulong ctl,
654 : ulong tsorig,
655 : ulong tspub,
656 0 : fd_stem_context_t * stem ) {
657 0 : (void)sz; (void)ctl;
658 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
659 :
660 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) handle_wh_data_frag( ctx, in_idx, chunk, tsorig, stem );
661 0 : else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_lv_data_frag( ctx, in_idx, sig, chunk );
662 0 : else handle_control_frag( ctx, sig, tsorig, tspub, stem );
663 :
664 : /* Because fd_stem may not return flow control credits fast enough,
665 : always update fseq (consumer progress) here. */
666 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) {
667 0 : ctx->wh_last_in_seq = seq;
668 0 : fd_fseq_update( ctx->in[ in_idx ].seq_sync, fd_seq_inc( seq, 1UL ) );
669 0 : }
670 :
671 0 : return 0;
672 0 : }
673 :
674 : static ulong
675 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
676 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
677 : ulong out_fds_cnt,
678 0 : int * out_fds ) {
679 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "incorrect out_fds_cnt %lu", out_fds_cnt ));
680 :
681 0 : ulong out_cnt = 0;
682 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
683 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
684 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
685 0 : }
686 :
687 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
688 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
689 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
690 :
691 0 : out_fds[ out_cnt++ ] = ctx->vinyl.dev_fd;
692 :
693 0 : if( FD_LIKELY( ctx->ioring->ioring_fd>=0 ) ) {
694 0 : out_fds[ out_cnt++ ] = ctx->ioring->ioring_fd;
695 0 : }
696 :
697 0 : return out_cnt;
698 0 : }
699 :
700 : static void
701 0 : during_housekeeping( fd_snaplh_t * ctx ) {
702 :
703 : /* Service io_uring instance */
704 :
705 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
706 0 : uint sq_drops = fd_io_uring_sq_dropped( ctx->ioring->sq );
707 0 : if( FD_UNLIKELY( sq_drops ) ) {
708 0 : FD_LOG_CRIT(( "kernel io_uring dropped I/O requests, cannot continue (sq_dropped=%u)", sq_drops ));
709 0 : }
710 :
711 0 : uint cq_drops = fd_io_uring_cq_overflow( ctx->ioring->cq );
712 0 : if( FD_UNLIKELY( cq_drops ) ) {
713 0 : FD_LOG_CRIT(( "kernel io_uring dropped I/O completions, cannot continue (cq_overflow=%u)", cq_drops ));
714 0 : }
715 0 : }
716 :
717 0 : }
718 :
719 : static ulong
720 : populate_allowed_seccomp( fd_topo_t const * topo,
721 : fd_topo_tile_t const * tile,
722 : ulong out_cnt,
723 0 : struct sock_filter * out ) {
724 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
725 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
726 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
727 :
728 0 : populate_sock_filter_policy_fd_snaplh_tile( out_cnt, out,
729 0 : (uint)fd_log_private_logfile_fd(),
730 0 : (uint)ctx->vinyl.dev_fd,
731 0 : (uint)ctx->ioring->ioring_fd /* possibly -1 */ );
732 0 : return sock_filter_policy_fd_snaplh_tile_instr_cnt;
733 0 : }
734 :
735 : static fd_vinyl_io_t *
736 : snaplh_io_uring_init( fd_snaplh_t * ctx,
737 : void * uring_shmem,
738 : void * vinyl_io_ur_mem,
739 0 : int dev_fd ) {
740 0 : ulong const uring_depth = VINYL_LTHASH_IORING_DEPTH;
741 0 : fd_io_uring_params_t params[1];
742 0 : fd_io_uring_params_init( params, (uint)uring_depth );
743 :
744 0 : if( FD_UNLIKELY( !fd_io_uring_init_shmem( ctx->ioring, params, uring_shmem, uring_depth, uring_depth ) ) ) {
745 0 : FD_LOG_ERR(( "fd_io_uring_init_shmem failed (%i-%s)", errno, fd_io_strerror( errno ) ));
746 0 : }
747 0 : fd_io_uring_t * ioring = ctx->ioring;
748 :
749 0 : if( FD_UNLIKELY( fd_io_uring_register_files( ioring->ioring_fd, &dev_fd, 1 )<0 ) ) {
750 0 : FD_LOG_ERR(( "io_uring_register_files failed (%i-%s)", errno, fd_io_strerror( errno ) ));
751 0 : }
752 :
753 0 : fd_io_uring_restriction_t res[3] = {
754 0 : { .opcode = FD_IORING_RESTRICTION_SQE_OP,
755 0 : .sqe_op = IORING_OP_READ },
756 0 : { .opcode = FD_IORING_RESTRICTION_SQE_FLAGS_REQUIRED,
757 0 : .sqe_flags = IOSQE_FIXED_FILE },
758 0 : { .opcode = FD_IORING_RESTRICTION_SQE_FLAGS_ALLOWED,
759 0 : .sqe_flags = 0 }
760 0 : };
761 0 : if( FD_UNLIKELY( fd_io_uring_register_restrictions( ioring->ioring_fd, res, 3U )<0 ) ) {
762 0 : FD_LOG_ERR(( "io_uring_register_restrictions failed (%i-%s)", errno, fd_io_strerror( errno ) ));
763 0 : }
764 :
765 0 : if( FD_UNLIKELY( fd_io_uring_enable_rings( ioring->ioring_fd )<0 ) ) {
766 0 : FD_LOG_ERR(( "io_uring_enable_rings failed (%i-%s)", errno, fd_io_strerror( errno ) ));
767 0 : }
768 :
769 0 : ulong align = fd_vinyl_io_ur_align();
770 0 : FD_TEST( fd_ulong_is_pow2( align ) );
771 :
772 0 : ulong footprint = fd_vinyl_io_ur_footprint( VINYL_LTHASH_IO_SPAD_MAX );
773 0 : FD_TEST( fd_ulong_is_aligned( footprint, align ) );
774 :
775 : /* Before invoking fd_vinyl_io_ur_init, the sync block must be
776 : already available. Although in principle one could keep
777 : calling fd_vinyl_io_ur_init until it returns !=NULL, doing this
778 : would log uncessary (and misleading) warnings. */
779 0 : FD_LOG_INFO(( "waiting for account database creation" ));
780 0 : for(;;) {
781 0 : fd_vinyl_bstream_block_t block[1];
782 0 : ulong dev_sync = 0UL; /* Use the beginning of the file for the sync block */
783 0 : bd_read( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
784 0 : int type = fd_vinyl_bstream_ctl_type( block->sync.ctl );
785 0 : if( FD_UNLIKELY( type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC ) ) continue;
786 0 : ulong io_seed = block->sync.hash_trail;
787 0 : if( FD_LIKELY( !fd_vinyl_bstream_block_test( io_seed, block ) ) ) break;
788 0 : fd_log_sleep( 1e6 ); /* 1ms */
789 0 : }
790 0 : FD_LOG_INFO(( "found valid account database sync block, attaching ..." ));
791 :
792 0 : fd_vinyl_io_t * io = fd_vinyl_io_ur_init( vinyl_io_ur_mem, VINYL_LTHASH_IO_SPAD_MAX, dev_fd, ioring );
793 0 : if( FD_UNLIKELY( !io ) ) FD_LOG_ERR(( "vinyl_io_ur_init failed" ));
794 0 : return io;
795 0 : }
796 :
797 : static void
798 : privileged_init( fd_topo_t * topo,
799 0 : fd_topo_tile_t * tile ) {
800 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
801 :
802 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
803 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
804 0 : void * pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ ); (void)pair_mem;
805 0 : void * pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ ); (void)pair_tmp;
806 0 : void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ ); (void)rd_req_mem;
807 0 : void * uring_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
808 0 : void * uring_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
809 :
810 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
811 :
812 : /* Set up io_bd dependencies */
813 :
814 0 : char const * bstream_path = tile->snaplh.vinyl_path;
815 : /* Note: it would be possible to use O_DIRECT, but it would require
816 : VINYL_LTHASH_BLOCK_ALIGN to be 4096UL, which substantially
817 : increases the read overhead, making it slower (keep in mind that
818 : a rather large subset of mainnet accounts typically fits inside
819 : one FD_VINYL_BSTREAM_BLOCK_SZ. */
820 0 : int dev_fd = open( bstream_path, O_RDONLY|O_CLOEXEC, 0444 );
821 0 : if( FD_UNLIKELY( dev_fd<0 ) ) {
822 0 : FD_LOG_ERR(( "open(%s,O_RDONLY|O_CLOEXEC, 0444) failed (%i-%s)",
823 0 : bstream_path, errno, fd_io_strerror( errno ) ));
824 0 : }
825 :
826 0 : struct stat st;
827 0 : if( FD_UNLIKELY( 0!=fstat( dev_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", bstream_path, errno, strerror( errno ) ));
828 :
829 0 : ctx->vinyl.dev_fd = dev_fd;
830 0 : ulong bstream_sz = (ulong)st.st_size;
831 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
832 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
833 0 : }
834 0 : ctx->vinyl.dev_sz = bstream_sz;
835 0 : ctx->vinyl.dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
836 :
837 0 : ctx->vinyl.io = NULL;
838 0 : ctx->ioring->ioring_fd = -1;
839 :
840 0 : if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
841 0 : ctx->vinyl.io = snaplh_io_uring_init( ctx, uring_shmem, uring_mem, dev_fd );
842 0 : }
843 0 : ctx->io_uring_enabled = tile->snaplh.io_uring_enabled;
844 0 : }
845 :
846 : static void
847 : unprivileged_init( fd_topo_t * topo,
848 0 : fd_topo_tile_t * tile ) {
849 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
850 :
851 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
852 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
853 0 : void * pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
854 0 : void * pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
855 0 : void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
856 :
857 0 : FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
858 :
859 0 : ctx->vinyl.pair_mem = pair_mem;
860 0 : ctx->vinyl.pair_tmp = pair_tmp;
861 :
862 0 : if( FD_UNLIKELY( tile->in_cnt!=IN_CNT_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, IN_CNT_MAX ));
863 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
864 :
865 0 : ctx->io_seed = NULL;
866 :
867 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
868 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
869 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
870 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_lh" ) ) ) {
871 0 : ctx->in[ i ].wksp = in_wksp->wksp;
872 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache );
873 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu );
874 0 : ctx->in[ i ].mtu = in_link->mtu;
875 0 : ctx->in[ i ].base = NULL;
876 0 : ctx->in[ i ].seq_sync = NULL;
877 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLV;
878 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
879 0 : ctx->in[ i ].wksp = in_wksp->wksp;
880 0 : ctx->in[ i ].chunk0 = 0;
881 0 : ctx->in[ i ].wmark = 0;
882 0 : ctx->in[ i ].mtu = 0;
883 0 : ctx->in[ i ].base = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snaplh.dcache_obj_id ) );
884 0 : ctx->in[ i ].seq_sync = tile->in_link_fseq[ i ];
885 0 : ctx->wh_last_in_seq = fd_fseq_query( tile->in_link_fseq[ i ] );
886 0 : ctx->in_kind[ i ] = IN_KIND_SNAPWH;
887 0 : ctx->io_seed = (ulong const *)fd_dcache_app_laddr_const( ctx->in[ i ].base );
888 0 : FD_TEST( ctx->in[ i ].base );
889 0 : } else {
890 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
891 0 : }
892 0 : }
893 :
894 0 : FD_TEST( ctx->io_seed );
895 :
896 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ 0UL ] ];
897 0 : ctx->out.wksp = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
898 0 : ctx->out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( out_link->dcache ), out_link->dcache );
899 0 : ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, out_link->dcache, out_link->mtu );
900 0 : ctx->out.chunk = ctx->out.chunk0;
901 0 : ctx->out.mtu = out_link->mtu;
902 0 : FD_TEST( 0==strcmp( out_link->name, "snaplh_lv" ) );
903 :
904 0 : fd_lthash_adder_new( ctx->adder );
905 0 : fd_lthash_adder_new( ctx->adder_sub );
906 :
907 0 : ctx->metrics.full.accounts_hashed = 0UL;
908 0 : ctx->metrics.incremental.accounts_hashed = 0UL;
909 :
910 0 : memset( ctx->vinyl.pending.phdr, 0, sizeof(fd_vinyl_bstream_phdr_t) * VINYL_LTHASH_RD_REQ_MAX );
911 0 : memset( ctx->vinyl.pending.rd_req, 0, sizeof(fd_vinyl_io_rd_t) * VINYL_LTHASH_RD_REQ_MAX );
912 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
913 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
914 0 : rd_req->ctx = rd_req_ctx_from_parts( i, VINYL_LTHASH_RD_REQ_FREE );
915 0 : rd_req->dst = NULL;
916 0 : if( rd_req_mem!=NULL ) {
917 0 : rd_req->dst = ((uchar*)rd_req_mem) + i*VINYL_LTHASH_BLOCK_MAX_SZ;
918 0 : }
919 0 : }
920 0 : ctx->vinyl.pending_rd_req_cnt = 0UL;
921 :
922 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
923 0 : ctx->full = 1;
924 0 : ctx->lthash_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplh" );
925 0 : ctx->lthash_tile_idx = tile->kind_id;
926 : /* This may seem redundant, but it provides flexibility around which
927 : tiles and do addition and subtraction of lthash. */
928 0 : ctx->lthash_tile_add_cnt = ctx->lthash_tile_cnt;
929 0 : ctx->lthash_tile_sub_cnt = ctx->lthash_tile_cnt;
930 0 : ctx->lthash_tile_add_idx = ctx->lthash_tile_idx;
931 0 : ctx->lthash_tile_sub_idx = ctx->lthash_tile_idx;
932 0 : if( ctx->lthash_tile_add_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_add_idx < ctx->lthash_tile_add_cnt );
933 0 : if( ctx->lthash_tile_sub_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_sub_idx < ctx->lthash_tile_sub_cnt );
934 0 : ctx->pairs_seen = 0UL;
935 0 : ctx->lthash_req_seen = 0UL;
936 0 : fd_lthash_zero( &ctx->running_lthash );
937 0 : fd_lthash_zero( &ctx->running_lthash_sub );
938 0 : ctx->running_capitalization_add = 0UL;
939 0 : ctx->running_capitalization_sub = 0UL;
940 :
941 0 : ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
942 0 : FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
943 0 : fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
944 0 : FD_TEST( vinyl_admin );
945 0 : ctx->vinyl.admin = vinyl_admin;
946 0 : for(;;) {
947 : /* This query can be done without the need of an rwlock. */
948 0 : ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
949 0 : if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
950 0 : vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
951 0 : fd_log_sleep( (long)1e6 /*1ms*/ );
952 0 : FD_SPIN_PAUSE();
953 0 : }
954 :
955 0 : ctx->lthash_completion_pending = 0;
956 0 : ctx->fail_completion_pending = 0;
957 0 : }
958 :
959 0 : #define STEM_BURST 1UL
960 0 : #define STEM_LAZY 1000L
961 :
962 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaplh_t
963 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplh_t)
964 :
965 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
966 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
967 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
968 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
969 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
970 :
971 : #include "../../disco/stem/fd_stem.c"
972 :
973 : fd_topo_run_tile_t fd_tile_snaplh = {
974 : .name = NAME,
975 : .populate_allowed_fds = populate_allowed_fds,
976 : .populate_allowed_seccomp = populate_allowed_seccomp,
977 : .scratch_align = scratch_align,
978 : .scratch_footprint = scratch_footprint,
979 : .privileged_init = privileged_init,
980 : .unprivileged_init = unprivileged_init,
981 : .run = stem_run,
982 : };
983 :
984 : #undef NAME
|