Line data Source code
1 : #define _DEFAULT_SOURCE /* madvise */
2 : #include "fd_snapwm_tile_private.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_ssparse.h"
5 : #include "utils/fd_vinyl_io_wd.h"
6 : #include "../../ballet/lthash/fd_lthash.h"
7 : #include "../../ballet/lthash/fd_lthash_adder.h"
8 : #include "../../util/pod/fd_pod.h"
9 :
10 : #include <errno.h>
11 : #include <fcntl.h> /* open */
12 : #include <sys/mman.h> /* mmap, madvise */
13 : #include <sys/stat.h> /* fstat */
14 : #include <unistd.h> /* close */
15 :
16 : #include "generated/fd_snapwm_tile_vinyl_seccomp.h"
17 :
18 : FD_STATIC_ASSERT( WD_WR_FSEQ_CNT_MAX<=FD_TOPO_MAX_TILE_IN_LINKS, "WD_WR_FSEQ_CNT_MAX" );
19 :
20 : /**********************************************************************\
21 :
22 : Vinyl 101:
23 : - Vinyl is Firedancer's main account database
24 : - Vinyl is comprised of several components on-disk and in-memory
25 : - vinyl_bstream is a single file containing all vinyl records
26 : - vinyl_bstream is the source of truth
27 : - vinyl_meta indexes the latest revisions of all elements in
28 : vinyl_bstream
29 : - Vinyl has an in-memory caching layer, but snapwm does not use it
30 :
31 : The snapshot loader must:
32 : - Load the most recent version of each account into bstream
33 : - Create a full vinyl_meta index of accounts
34 : - Recover from load failures and retry
35 :
36 : Note on I/O layers:
37 : - io_mm is the slow/generic memory mapped I/O backend.
38 : - io_wd is the fast/dumb O_DIRECT backend. Can only append, thus used
39 : for hot path account writing.
40 : - io_mm and io_wd cannot be active at the same time -- snapwm will
41 : switch between them as necessary.
42 :
43 : Full snapshot logic:
44 : - Write accounts to bstream (io_wd)
45 : - Synchronously populate the vinyl_meta index while writing
46 : - On load failure, destroy and recreate the bstream (io_mm)
47 :
48 : Incremental snapshot logic:
49 : - Phase 1: while reading the incremental snapshot
50 : - Write accounts to bstream without updating the index (io_wd)
51 : - On load failure, undo writes done to bstream (io_mm)
52 : - Phase 2: once read is done
53 : - Replay all elements written to bstream (io_mm)
54 : - Populate the vinyl_meta index while replaying
55 :
56 : \**********************************************************************/
57 :
58 : void
59 : fd_snapwm_vinyl_privileged_init( fd_snapwm_tile_t * ctx,
60 : fd_topo_t * topo,
61 0 : fd_topo_tile_t * tile ) {
62 0 : void * shmap = fd_topo_obj_laddr( topo, tile->snapwm.vinyl_meta_map_obj_id );
63 0 : void * shele = fd_topo_obj_laddr( topo, tile->snapwm.vinyl_meta_pool_obj_id );
64 :
65 0 : FD_TEST( fd_vinyl_meta_join( ctx->vinyl.map, shmap, shele ) );
66 :
67 : /* Set up io_mm dependencies */
68 :
69 0 : char const * bstream_path = tile->snapwm.vinyl_path;
70 0 : int bstream_fd = open( bstream_path, O_RDWR|O_CLOEXEC, 0644 );
71 0 : if( FD_UNLIKELY( bstream_fd<0 ) ) {
72 0 : FD_LOG_ERR(( "open(%s,O_RDWR|O_CLOEXEC,0644) failed (%i-%s)",
73 0 : bstream_path, errno, fd_io_strerror( errno ) ));
74 0 : }
75 :
76 0 : struct stat st;
77 0 : if( FD_UNLIKELY( fstat( bstream_fd, &st )!=0 ) ) {
78 0 : FD_LOG_ERR(( "fstat(%s) failed (%i-%s)",
79 0 : bstream_path, errno, fd_io_strerror( errno ) ));
80 0 : }
81 0 : ulong bstream_sz = (ulong)st.st_size;
82 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
83 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
84 0 : }
85 :
86 0 : void * bstream_mem = mmap( NULL, bstream_sz, PROT_READ|PROT_WRITE, MAP_SHARED, bstream_fd, 0 );
87 0 : if( FD_UNLIKELY( bstream_mem==MAP_FAILED ) ) {
88 0 : FD_LOG_ERR(( "mmap(sz=%lu,PROT_READ|PROT_WRITE,MAP_SHARED,path=%s,off=0) failed (%i-%s)",
89 0 : bstream_sz, bstream_path, errno, fd_io_strerror( errno ) ));
90 0 : }
91 :
92 0 : if( FD_UNLIKELY( 0!=close( bstream_fd ) ) ) { /* clean up unused fd */
93 0 : FD_LOG_ERR(( "close(fd=%i) failed (%i-%s)",
94 0 : bstream_fd, errno, fd_io_strerror( errno ) ));
95 0 : }
96 :
97 0 : ctx->vinyl.bstream_mem = bstream_mem;
98 0 : ctx->vinyl.bstream_sz = bstream_sz;
99 :
100 0 : FD_TEST( fd_rng_secure( &ctx->vinyl.io_seed, 8UL ) );
101 0 : }
102 :
103 : static void
104 0 : io_mm_align_4k( fd_snapwm_tile_t * ctx ) {
105 0 : fd_vinyl_io_t * io_mm = ctx->vinyl.io_mm;
106 0 : if( FD_UNLIKELY( io_mm->seq_future!=0UL ) ) {
107 0 : FD_LOG_CRIT(( "unexpected io_mm state (seq_future=%lu)", io_mm->seq_future ));
108 0 : }
109 0 : uchar * mmio = fd_vinyl_mmio ( io_mm );
110 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io_mm );
111 :
112 0 : ulong bstream_preamble = fd_ulong_align_up( FD_VINYL_BSTREAM_BLOCK_SZ, 4096UL ) - FD_VINYL_BSTREAM_BLOCK_SZ;
113 0 : FD_CRIT( bstream_preamble<=mmio_sz, "bstream too small for 4k alignment" );
114 :
115 0 : fd_memset( mmio, 0, bstream_preamble );
116 0 : io_mm->seq_present += bstream_preamble;
117 0 : io_mm->seq_future += bstream_preamble;
118 0 : }
119 :
120 : void
121 : fd_snapwm_vinyl_unprivileged_init( fd_snapwm_tile_t * ctx,
122 : fd_topo_t * topo,
123 : fd_topo_tile_t * tile,
124 : void * io_mm_mem,
125 0 : void * io_wd_mem ) {
126 :
127 : /* Set up io_mm */
128 :
129 0 : ctx->vinyl.io_mm =
130 0 : fd_vinyl_io_mm_init( io_mm_mem,
131 0 : FD_SNAPWM_IO_SPAD_MAX,
132 0 : ctx->vinyl.bstream_mem,
133 0 : ctx->vinyl.bstream_sz,
134 0 : 1,
135 0 : "accounts-v0", 12UL,
136 0 : ctx->vinyl.io_seed );
137 0 : if( FD_UNLIKELY( !ctx->vinyl.io_mm ) ) {
138 0 : FD_LOG_ERR(( "fd_vinyl_io_mm_init failed" ));
139 0 : }
140 :
141 : /* Write out zero blocks to align the bstream by 4096 bytes
142 : (Assuming a 128 byte sync block) */
143 :
144 0 : io_mm_align_4k( ctx );
145 :
146 : /* Set up io_wd dependencies */
147 :
148 0 : ulong wr_link_id = fd_topo_find_tile_out_link( topo, tile, "snapwm_wh", 0UL );
149 0 : if( FD_UNLIKELY( wr_link_id==ULONG_MAX ) ) FD_LOG_CRIT(( "snapwm_wh link not found" ));
150 0 : fd_topo_link_t * wr_link = &topo->links[ tile->out_link_id[ wr_link_id ] ];
151 :
152 0 : if( FD_UNLIKELY( tile->snapwm.snapwr_depth != fd_mcache_depth( wr_link->mcache ) ) ) {
153 : /* FIXME TOCTOU issue ... A malicious downstream tile could
154 : theoretically corrupt mcache->depth and cause an OOB access
155 : while snapwm is still initializing. Practically not an
156 : issue because the system is not exposed to attacker-
157 : controlled input at boot time. */
158 0 : FD_LOG_CRIT(( "snapwm_wr link mcache depth %lu does not match snapwr_depth %lu",
159 0 : fd_mcache_depth( wr_link->mcache ), tile->snapwm.snapwr_depth ));
160 0 : }
161 :
162 0 : ulong expected_wr_link_consumers_cnt = fd_topo_tile_name_cnt( topo, "snapwh" );
163 0 : if( FD_UNLIKELY( fd_topo_link_reliable_consumer_cnt( topo, wr_link )!=expected_wr_link_consumers_cnt ) ) {
164 0 : FD_LOG_CRIT(( "snapwm_wr link must have exactly %lu reliable consumers", expected_wr_link_consumers_cnt ));
165 0 : }
166 :
167 0 : ulong const * wh_fseq[WD_WR_FSEQ_CNT_MAX];
168 0 : ulong wh_fseq_cnt = 0UL;
169 0 : ulong wh_fseq_cnt_expected = fd_topo_tile_name_cnt( topo, "snapwh" );
170 0 : FD_TEST( wh_fseq_cnt_expected<=WD_WR_FSEQ_CNT_MAX );
171 0 : FD_TEST( wh_fseq_cnt_expected==fd_topo_link_reliable_consumer_cnt( topo, wr_link ) );
172 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
173 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
174 0 : for( ulong in_idx=0UL; in_idx<consumer_tile->in_cnt; in_idx++ ) {
175 0 : if( consumer_tile->in_link_id[ in_idx ]==wr_link->id ) {
176 0 : FD_TEST( wh_fseq_cnt<WD_WR_FSEQ_CNT_MAX );
177 0 : wh_fseq[ wh_fseq_cnt ] = consumer_tile->in_link_fseq[ in_idx ];
178 0 : wh_fseq_cnt++;
179 0 : }
180 0 : }
181 0 : }
182 0 : if( FD_UNLIKELY( wh_fseq_cnt!=wh_fseq_cnt_expected ) ) {
183 0 : FD_LOG_ERR(( "unable to find %lu fseq(s) for output link %s:%lu",
184 0 : wh_fseq_cnt, wr_link->name, wr_link->kind_id ));
185 0 : }
186 :
187 : /* Set up io_wd */
188 :
189 0 : ctx->vinyl.io_wd =
190 0 : fd_vinyl_io_wd_init( io_wd_mem,
191 0 : ctx->vinyl.bstream_sz,
192 0 : ctx->vinyl.io_mm->seed,
193 0 : wr_link->mcache,
194 0 : wr_link->dcache,
195 0 : wh_fseq,
196 0 : wh_fseq_cnt,
197 0 : wr_link->mtu,
198 0 : tile->snapwm.vinyl_path );
199 0 : if( FD_UNLIKELY( !ctx->vinyl.io_wd ) ) {
200 0 : FD_LOG_ERR(( "fd_vinyl_io_wd_init failed" ));
201 0 : }
202 :
203 : /* Start by using io_mm */
204 :
205 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
206 :
207 0 : ctx->vinyl.duplicate_accounts_batch_sz = 0UL;
208 0 : ctx->vinyl.duplicate_accounts_batch_cnt = 0UL;
209 :
210 0 : ctx->vinyl.pair_cnt = 0UL;
211 0 : ctx->vinyl.pair_cnt_max = tile->snapwm.max_accounts;
212 :
213 0 : fd_lthash_adder_new( &ctx->vinyl.adder );
214 0 : fd_lthash_zero( &ctx->vinyl.running_lthash );
215 :
216 0 : ulong wr_cnt = fd_topo_tile_name_cnt( topo, "snapwr" );
217 0 : ctx->vinyl.wr_cnt = wr_cnt;
218 :
219 0 : ctx->vinyl.admin = NULL;
220 0 : if( FD_LIKELY( !ctx->lthash_disabled ) ) {
221 0 : ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
222 0 : FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
223 0 : fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
224 0 : FD_TEST( vinyl_admin );
225 0 : ctx->vinyl.admin = vinyl_admin;
226 :
227 : /* There is no need for rw_lock here, since every other consumer
228 : is waiting for the completion of this initialization step and
229 : this can be done without a lock. */
230 0 : FD_TEST( fd_snapwm_vinyl_init_admin( ctx, 0/*do_rwlock*/ ) );
231 0 : }
232 :
233 0 : ctx->vinyl.txn_active = 0;
234 0 : ctx->vinyl.txn_commit = 0;
235 0 : }
236 :
237 : ulong
238 : fd_snapwm_vinyl_seccomp( ulong out_cnt,
239 0 : struct sock_filter * out ) {
240 0 : populate_sock_filter_policy_fd_snapwm_tile_vinyl( out_cnt, out, (uint)fd_log_private_logfile_fd() );
241 0 : return sock_filter_policy_fd_snapwm_tile_vinyl_instr_cnt;
242 0 : }
243 :
244 : static void
245 0 : vinyl_mm_sync( fd_snapwm_tile_t * ctx ) {
246 0 : if( FD_UNLIKELY( 0!=msync( ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz, MS_SYNC|MS_INVALIDATE ) ) ) {
247 0 : FD_LOG_ERR(( "msync(addr=%p,sz=%lu,MS_SYNC|MS_INVALIDATE) failed (%i-%s)",
248 0 : (void *)ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz,
249 0 : errno, fd_io_strerror( errno ) ));
250 0 : }
251 0 : }
252 :
253 : /* Faster vinyl meta accesses *****************************************/
254 :
255 : static fd_vinyl_meta_ele_t *
256 : fd_vinyl_meta_prepare_nolock( fd_vinyl_meta_t * join,
257 : fd_vinyl_key_t const * key,
258 0 : ulong memo ) {
259 0 : fd_vinyl_meta_ele_t * ele0 = join->ele;
260 0 : ulong ele_max = join->ele_max;
261 0 : ulong probe_max = join->probe_max;
262 0 : void * ctx = join->ctx;
263 :
264 0 : ulong start_idx = memo & (ele_max-1UL);
265 :
266 0 : for(;;) {
267 :
268 0 : ulong ele_idx = start_idx;
269 :
270 0 : for( ulong probe_rem=probe_max; probe_rem; probe_rem-- ) {
271 0 : fd_vinyl_meta_ele_t * ele = ele0 + ele_idx;
272 :
273 0 : if( FD_LIKELY( fd_vinyl_meta_private_ele_is_free( ctx, ele ) ) || /* opt for low collision */
274 0 : (
275 0 : FD_LIKELY( ele->memo==memo ) &&
276 : FD_LIKELY( fd_vinyl_key_eq( &ele->phdr.key, key ) ) /* opt for already in map */
277 0 : ) ) {
278 0 : return ele;
279 0 : }
280 :
281 0 : ele_idx = (ele_idx+1UL) & (ele_max-1UL);
282 0 : }
283 :
284 0 : return NULL;
285 :
286 0 : }
287 :
288 : /* never get here */
289 0 : }
290 :
291 : /* Transactional APIs *************************************************/
292 :
293 : void
294 0 : fd_snapwm_vinyl_txn_begin( fd_snapwm_tile_t * ctx ) {
295 0 : FD_CRIT( !ctx->vinyl.txn_active, "txn_begin called while already in txn" );
296 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
297 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
298 :
299 0 : if( FD_UNLIKELY( ctx->vinyl.txn_commit ) ) {
300 0 : FD_LOG_CRIT(( "unable to perform txn_begin after a completed txn_commit" ));
301 0 : return;
302 0 : }
303 :
304 : /* Finish any outstanding writes */
305 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
306 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
307 :
308 0 : ctx->vinyl.txn_seq = io->seq_present;
309 0 : ctx->vinyl.txn_active = 1;
310 0 : }
311 :
312 : FD_FN_UNUSED static void
313 : streamlined_hash( fd_snapwm_tile_t * restrict ctx,
314 : fd_lthash_adder_t * restrict adder,
315 : fd_lthash_value_t * restrict running_lthash,
316 0 : uchar const * restrict _pair ) {
317 0 : uchar const * pair = _pair;
318 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
319 0 : pair += sizeof(fd_vinyl_bstream_phdr_t);
320 0 : fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
321 0 : pair += sizeof(fd_account_meta_t);
322 0 : uchar const * data = pair;
323 :
324 0 : ulong data_len = meta->dlen;
325 0 : const char * pubkey = phdr->key.c;
326 0 : ulong lamports = meta->lamports;
327 0 : const uchar * owner = meta->owner;
328 0 : uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
329 :
330 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
331 0 : if( FD_UNLIKELY( lamports==0UL ) ) return;
332 :
333 0 : fd_lthash_adder_push_solana_account( adder,
334 0 : running_lthash,
335 0 : pubkey,
336 0 : data,
337 0 : data_len,
338 0 : lamports,
339 0 : executable,
340 0 : owner );
341 0 : ctx->vinyl.running_capitalization = fd_ulong_sat_add( ctx->vinyl.running_capitalization, lamports );
342 0 : }
343 :
344 : void
345 : fd_snapwm_vinyl_txn_commit( fd_snapwm_tile_t * ctx,
346 0 : fd_stem_context_t * stem ) {
347 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_commit called while not in txn" );
348 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
349 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
350 :
351 0 : long dt = -fd_log_wallclock();
352 :
353 : /* Finish any outstanding writes */
354 :
355 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
356 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
357 :
358 : /* Hint to kernel to start prefetching to speed up reads */
359 :
360 0 : uchar * mmio = fd_vinyl_mmio ( io ); FD_TEST( mmio );
361 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
362 :
363 0 : ulong txn_seq0 = ctx->vinyl.txn_seq;
364 0 : ulong txn_seq1 = ctx->vinyl.io_mm->seq_present;
365 0 : FD_LOG_INFO(( "vinyl txn_commit starting for seq [%lu,%lu)", txn_seq0, txn_seq1 ));
366 0 : ulong txn_sz = txn_seq1-txn_seq0;
367 0 : FD_CRIT( fd_vinyl_seq_le( txn_seq0, txn_seq1 ), "invalid txn seq range" );
368 0 : FD_CRIT( txn_seq1 <= mmio_sz, "invalid txn seq range" );
369 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( txn_seq0, txn_seq1 ) ) ) return;
370 :
371 0 : void * madv_base = (void *)fd_ulong_align_dn( (ulong)mmio+txn_seq0, FD_SHMEM_NORMAL_PAGE_SZ );
372 0 : ulong madv_sz = /* */fd_ulong_align_up( txn_sz, FD_SHMEM_NORMAL_PAGE_SZ );
373 0 : if( FD_UNLIKELY( madvise( madv_base, madv_sz, MADV_SEQUENTIAL ) ) ) {
374 0 : FD_LOG_WARNING(( "madvise(addr=%p,sz=%lu,MADV_SEQUENTIAL) failed (%i-%s)",
375 0 : madv_base, madv_sz,
376 0 : errno, fd_io_strerror( errno ) ));
377 0 : }
378 :
379 : /* Replay incremental account updates */
380 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
381 0 : fd_snapwm_vinyl_duplicate_accounts_lthash_init( ctx, stem );
382 0 : ulong dup_batch_cnt = 0UL;
383 :
384 0 : fd_vinyl_meta_t * meta_map = ctx->vinyl.map;
385 0 : for( ulong seq=txn_seq0; fd_vinyl_seq_lt( seq, txn_seq1 ); ) {
386 0 : fd_vinyl_bstream_block_t * block = (void *)( mmio+seq );
387 :
388 : /* Speculatively read block info */
389 0 : ulong ctl = FD_VOLATILE_CONST( block->ctl );
390 0 : fd_vinyl_bstream_phdr_t phdr = FD_VOLATILE_CONST( block->phdr );
391 :
392 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
393 0 : int block_type = fd_vinyl_bstream_ctl_type( ctl );
394 0 : ulong block_sz;
395 :
396 0 : if( FD_LIKELY( block_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
397 0 : block_sz = fd_vinyl_bstream_pair_sz( val_esz );
398 0 : ulong memo = fd_vinyl_key_memo( meta_map->seed, &phdr.key );
399 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_prepare_nolock( meta_map, &phdr.key, memo );
400 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "fd_vinyl_meta_prepare failed (full)" ));
401 :
402 : /* Erase value if existing is newer */
403 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) { /* key exists */
404 0 : ulong exist_slot = fd_snapin_vinyl_pair_info_slot( &ele->phdr.info );
405 0 : ulong cur_slot = fd_snapin_vinyl_pair_info_slot( &phdr.info );
406 0 : if( exist_slot > cur_slot ) {
407 0 : ctx->metrics.accounts_ignored++;
408 0 : FD_COMPILER_MFENCE();
409 0 : fd_snapwm_vinyl_duplicate_accounts_lthash_append( ctx, (uchar*)block/*pair*/ );
410 0 : FD_COMPILER_MFENCE();
411 0 : fd_memset( block, 0, block_sz );
412 0 : goto next;
413 0 : } else {
414 0 : dup_batch_cnt += (ulong)fd_snapwm_vinyl_duplicate_accounts_batch_append( ctx, &ele->phdr, ele->seq );
415 0 : }
416 0 : ctx->metrics.accounts_replaced++;
417 0 : } else {
418 0 : if( FD_UNLIKELY( ctx->vinyl.pair_cnt++ > ctx->vinyl.pair_cnt_max ) ) {
419 0 : FD_LOG_ERR(( "failed to load snapshot: exceeded [accounts.max_accounts] (%lu)", ctx->vinyl.pair_cnt_max ));
420 0 : }
421 0 : }
422 :
423 : /* Overwrite map entry */
424 0 : ele->memo = memo;
425 0 : ele->phdr = phdr;
426 0 : ele->seq = seq;
427 0 : ele->line_idx = ULONG_MAX;
428 0 : } else if( block_type==FD_VINYL_BSTREAM_CTL_TYPE_ZPAD ) {
429 0 : block_sz = FD_VINYL_BSTREAM_BLOCK_SZ;
430 0 : } else {
431 0 : FD_LOG_CRIT(( "unexpected block type %d", block_type ));
432 0 : }
433 :
434 0 : if( FD_UNLIKELY( !block_sz ) ) {
435 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx (zero block_sz)", seq, ctl ));
436 0 : }
437 0 : if( FD_UNLIKELY( block_sz > 64UL<<20 ) ) {
438 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx, block_sz=%lu (unreasonably large block size)", seq, ctl, block_sz ));
439 0 : }
440 :
441 0 : next:
442 0 : seq += block_sz;
443 :
444 0 : if( FD_UNLIKELY( dup_batch_cnt >= FD_SNAPWM_DUP_META_BATCH_CNT_MAX ) ) {
445 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
446 0 : FD_COMPILER_MFENCE();
447 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
448 0 : dup_batch_cnt = 0UL;
449 0 : }
450 0 : }
451 :
452 : /* Batch fini must be invoked before lthash fini for two reasons:
453 : the batch still needs to be processed downstream and there should
454 : be no fd_stem_publish between batch init and fini. */
455 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
456 0 : FD_COMPILER_MFENCE();
457 0 : fd_snapwm_vinyl_duplicate_accounts_lthash_fini( ctx, stem );
458 :
459 : /* Persist above erases to disk */
460 :
461 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
462 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
463 0 : vinyl_mm_sync( ctx );
464 :
465 0 : ctx->vinyl.txn_commit = 1;
466 :
467 0 : dt += fd_log_wallclock();
468 0 : FD_LOG_INFO(( "vinyl txn_commit took %g seconds", (double)dt/1e9 ));
469 0 : }
470 :
471 : void
472 0 : fd_snapwm_vinyl_txn_cancel( fd_snapwm_tile_t * ctx ) {
473 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_cancel called while not in txn" );
474 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
475 :
476 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
477 0 : fd_vinyl_io_rewind( io, ctx->vinyl.txn_seq );
478 0 : fd_vinyl_io_sync ( io, FD_VINYL_IO_FLAG_BLOCKING );
479 0 : }
480 :
481 : /* Fast writer ********************************************************/
482 :
483 : void
484 0 : fd_snapwm_vinyl_wd_init( fd_snapwm_tile_t * ctx ) {
485 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
486 :
487 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
488 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_mm) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
489 :
490 : /* Flush io_mm */
491 :
492 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
493 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
494 0 : vinyl_mm_sync( ctx );
495 :
496 : /* Synchronize sequence numbers */
497 :
498 0 : ctx->vinyl.io_wd->seq_ancient = ctx->vinyl.io_mm->seq_ancient;
499 0 : ctx->vinyl.io_wd->seq_past = ctx->vinyl.io_mm->seq_past;
500 0 : ctx->vinyl.io_wd->seq_present = ctx->vinyl.io_mm->seq_present;
501 0 : ctx->vinyl.io_wd->seq_future = ctx->vinyl.io_mm->seq_future;
502 0 : ctx->vinyl.io_wd->spad_used = 0UL;
503 :
504 0 : ctx->vinyl.io = ctx->vinyl.io_wd;
505 0 : }
506 :
507 : void
508 0 : fd_snapwm_vinyl_wd_fini( fd_snapwm_tile_t * ctx ) {
509 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_wd ) ) return;
510 :
511 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_wd, FD_VINYL_IO_FLAG_BLOCKING );
512 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_wd) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
513 :
514 : /* Synchronize sequence numbers */
515 :
516 0 : ctx->vinyl.io_mm->seq_ancient = ctx->vinyl.io_wd->seq_ancient;
517 0 : ctx->vinyl.io_mm->seq_past = ctx->vinyl.io_wd->seq_past;
518 0 : ctx->vinyl.io_mm->seq_present = ctx->vinyl.io_wd->seq_present;
519 0 : ctx->vinyl.io_mm->seq_future = ctx->vinyl.io_wd->seq_future;
520 0 : ctx->vinyl.io_mm->spad_used = 0UL;
521 :
522 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
523 :
524 0 : vinyl_mm_sync( ctx );
525 0 : }
526 :
527 : /* bstream_alloc is a faster version of fd_vinyl_io_alloc. Indirect
528 : calls have significant overhead on Zen 5. */
529 :
530 : static uchar *
531 : bstream_alloc( fd_vinyl_io_t * io,
532 : ulong sz,
533 0 : int flags ) {
534 0 : if( FD_LIKELY( io->impl==&fd_vinyl_io_wd_impl ) )
535 0 : return fd_vinyl_io_wd_alloc( io, sz, flags );
536 0 : return fd_vinyl_io_alloc( io, sz, flags );
537 0 : }
538 :
539 : /* fd_snapwm_vinyl_process_account reads and processes a batch of
540 : pre-generated bstream pairs, handles the meta_map, and determines
541 : whether to forward each of the accounts (pairs) to the database. */
542 :
543 : void
544 : fd_snapwm_vinyl_process_account( fd_snapwm_tile_t * ctx,
545 : ulong chunk,
546 : ulong acc_cnt,
547 0 : fd_stem_context_t * stem ) {
548 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
549 0 : fd_vinyl_meta_t * map = ctx->vinyl.map;
550 :
551 0 : uchar * src = fd_chunk_to_laddr( ctx->in.wksp, chunk );
552 :
553 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
554 :
555 0 : for( ulong acc_i=0UL; acc_i<acc_cnt; acc_i++ ) {
556 :
557 0 : fd_vinyl_bstream_phdr_t * src_phdr = (fd_vinyl_bstream_phdr_t*)src;
558 : /* phdr's recovery_seq may need to be updated, and this cannot
559 : happen on the src dcache. */
560 0 : fd_vinyl_bstream_phdr_t phdr = *src_phdr;
561 :
562 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( phdr.ctl );
563 :
564 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
565 0 : uchar * pair = bstream_alloc( io, pair_sz, FD_VINYL_IO_FLAG_BLOCKING );
566 0 : uchar * dst = pair;
567 :
568 0 : ulong const account_header_slot = fd_snapin_vinyl_pair_info_slot( &phdr.info );
569 :
570 0 : ctx->metrics.accounts_loaded++;
571 :
572 0 : int do_meta_update = ctx->full || !ctx->lthash_disabled;
573 :
574 0 : ulong recovery_seq = 0UL;
575 :
576 0 : fd_vinyl_meta_ele_t * ele = NULL;
577 0 : if( FD_LIKELY( do_meta_update ) ) {
578 0 : ulong memo = fd_vinyl_key_memo( map->seed, &phdr.key );
579 0 : ele = fd_vinyl_meta_prepare_nolock( map, &phdr.key, memo );
580 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "Failed to update vinyl index (full)" ));
581 :
582 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) {
583 : /* Drop current value if existing is newer */
584 0 : ulong const exist_slot = fd_snapin_vinyl_pair_info_slot( &ele->phdr.info );
585 0 : if( FD_UNLIKELY( exist_slot > account_header_slot ) ) {
586 0 : ctx->metrics.accounts_ignored++;
587 0 : src += pair_sz;
588 0 : continue;
589 0 : } else {
590 0 : fd_snapwm_vinyl_duplicate_accounts_batch_append( ctx, &ele->phdr, ele->seq );
591 0 : recovery_seq = ele->seq;
592 0 : ctx->metrics.accounts_replaced++;
593 0 : }
594 0 : } else {
595 0 : if( FD_UNLIKELY( ctx->vinyl.pair_cnt++ > ctx->vinyl.pair_cnt_max ) ) {
596 0 : FD_LOG_ERR(( "failed to load snapshot: exceeded [accounts.max_accounts] (%lu)", ctx->vinyl.pair_cnt_max ));
597 0 : }
598 0 : }
599 :
600 0 : fd_snapin_vinyl_pair_info_update_recovery_seq( &phdr.info, recovery_seq );
601 0 : ele->memo = memo;
602 0 : ele->phdr.ctl = phdr.ctl;
603 0 : ele->phdr.key = phdr.key;
604 0 : ele->phdr.info = phdr.info;
605 0 : ele->seq = ULONG_MAX; /* later init */
606 0 : ele->line_idx = ULONG_MAX;
607 0 : }
608 :
609 : /* sizeof(fd_vinyl_bstream_phdr_t) is less than the minimum
610 : pair_sz==FD_VINYL_BSTREAM_BLOCK_SZ. */
611 0 : ulong off = sizeof(fd_vinyl_bstream_phdr_t);
612 0 : fd_memcpy( dst, &phdr, off );
613 0 : fd_memcpy( dst+off, src+off, pair_sz-off );
614 0 : src += pair_sz;
615 :
616 0 : ulong seq_after = fd_vinyl_io_append( io, pair, pair_sz );
617 0 : if( FD_LIKELY( do_meta_update ) ) ele->seq = seq_after;
618 0 : }
619 :
620 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
621 0 : }
622 :
623 : void
624 0 : fd_snapwm_vinyl_shutdown( fd_snapwm_tile_t * ctx ) {
625 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io, FD_VINYL_IO_FLAG_BLOCKING );
626 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
627 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
628 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
629 0 : vinyl_mm_sync( ctx );
630 :
631 0 : fd_vinyl_io_wd_ctrl( ctx->vinyl.io_wd, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL );
632 0 : }
633 :
634 : void
635 : fd_snapwm_vinyl_read_account( fd_snapwm_tile_t * ctx,
636 : void const * acct_addr,
637 : fd_account_meta_t * meta,
638 : uchar * data,
639 0 : ulong data_max ) {
640 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_mm ) ) {
641 0 : FD_LOG_CRIT(( "vinyl not in io_mm mode" ));
642 0 : }
643 :
644 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
645 :
646 : /* Query database index */
647 :
648 0 : fd_vinyl_key_t key[1];
649 0 : fd_vinyl_key_init( key, acct_addr, 32UL );
650 0 : ulong memo = fd_vinyl_key_memo( ctx->vinyl.map->seed, key );
651 0 : fd_vinyl_meta_ele_t const * ele = fd_vinyl_meta_prepare_nolock( ctx->vinyl.map, key, memo );
652 0 : if( FD_UNLIKELY( !ele || !fd_vinyl_meta_ele_in_use( ele ) ) ) {
653 : /* account not found */
654 0 : return;
655 0 : }
656 :
657 0 : uchar * mmio = fd_vinyl_mmio ( ctx->vinyl.io_mm );
658 0 : ulong mmio_sz = fd_vinyl_mmio_sz( ctx->vinyl.io_mm );
659 :
660 : /* Validate index record */
661 :
662 0 : ulong const seq0 = ele->seq;
663 0 : ulong const ctl = ele->phdr.ctl;
664 0 : int const ctl_type = fd_vinyl_bstream_ctl_type( ctl );
665 0 : ulong const val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
666 0 : ulong const pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
667 0 : ulong const seq1 = seq0 + pair_sz;
668 0 : ulong const seq_past = ctx->vinyl.io->seq_past;
669 0 : ulong const seq_present = ctx->vinyl.io->seq_present;
670 0 : if( FD_UNLIKELY( ctl_type!=FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
671 0 : FD_LOG_CRIT(( "corrupt bstream record in index: ctl=%016lx", ctl ));
672 0 : }
673 0 : if( FD_UNLIKELY( val_esz<sizeof(fd_account_meta_t) ||
674 0 : val_esz>sizeof(fd_account_meta_t)+FD_RUNTIME_ACC_SZ_MAX ) ) {
675 0 : FD_LOG_CRIT(( "corrupt bstream record in index: val_esz=%lu", val_esz ));
676 0 : }
677 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
678 0 : if( FD_UNLIKELY( bad_past ) ) {
679 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) not in [seq_past=%lu,seq_present=%lu)",
680 0 : seq0, seq1, seq_past, seq_present ));
681 0 : }
682 :
683 : /* Map seq range to underlying device
684 : In the snapshot loader, it is safe to assume that bstream reads
685 : do not wrap around. */
686 :
687 0 : if( FD_UNLIKELY( seq1>mmio_sz ) ) {
688 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) exceeds bstream addressable range [0,%lu)",
689 0 : seq0, seq1, mmio_sz ));
690 0 : }
691 :
692 : /* Read from bstream */
693 :
694 0 : ulong seq_meta = seq0 + sizeof(fd_vinyl_bstream_phdr_t);
695 0 : ulong seq_data = seq_meta + sizeof(fd_account_meta_t);
696 :
697 0 : memcpy( meta, mmio+seq_meta, sizeof(fd_account_meta_t) );
698 0 : if( FD_UNLIKELY( sizeof(fd_account_meta_t)+(ulong)meta->dlen > val_esz ) ) {
699 0 : FD_LOG_CRIT(( "corrupt bstream record: seq0=%lu val_esz=%lu dlen=%u", seq0, val_esz, meta->dlen ));
700 0 : }
701 0 : if( FD_UNLIKELY( meta->dlen > data_max ) ) {
702 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
703 0 : FD_LOG_CRIT(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
704 0 : acct_addr_b58, (ulong)meta->dlen, data_max ));
705 0 : }
706 0 : memcpy( data, mmio+seq_data, meta->dlen );
707 0 : }
708 :
709 : /* handle_hash_out_fseq_check is a blocking operation */
710 : static inline void
711 : handle_hash_out_fseq_check( fd_snapwm_tile_t * ctx,
712 : fd_stem_context_t * stem,
713 0 : ulong min_credit ) {
714 0 : ulong producer_fseq = fd_fseq_query( &stem->seqs[ ctx->hash_out.idx ] );
715 0 : ulong consumer_fseq = fd_fseq_query( ctx->hash_out.consumer_fseq );
716 0 : for(;;) {
717 0 : ulong avail = ctx->hash_out.depth - ( producer_fseq - consumer_fseq );
718 0 : if( FD_LIKELY( avail > min_credit ) ) break;
719 0 : FD_SPIN_PAUSE();
720 0 : consumer_fseq = fd_fseq_query( ctx->hash_out.consumer_fseq );
721 0 : }
722 0 : }
723 :
724 : int
725 : fd_snapwm_vinyl_duplicate_accounts_batch_init( fd_snapwm_tile_t * ctx,
726 0 : fd_stem_context_t * stem ) {
727 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
728 0 : ctx->vinyl.duplicate_accounts_batch_sz = 0UL;
729 0 : ctx->vinyl.duplicate_accounts_batch_cnt = 0UL;
730 :
731 : /* fseq check is mandatory here, since append writes directly to
732 : the dcache. */
733 0 : handle_hash_out_fseq_check( ctx, stem, FD_SNAPWM_DUP_BATCH_CREDIT_MIN );
734 0 : return 1;
735 0 : }
736 :
737 : int
738 : fd_snapwm_vinyl_duplicate_accounts_batch_append( fd_snapwm_tile_t * ctx,
739 : fd_vinyl_bstream_phdr_t * phdr,
740 0 : ulong seq ) {
741 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
742 0 : uchar * data = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
743 0 : data += ctx->vinyl.duplicate_accounts_batch_sz; /* offset into the chunk */
744 0 : memcpy( data, &seq, sizeof(ulong) );
745 0 : memcpy( data + sizeof(ulong), phdr, sizeof(fd_vinyl_bstream_phdr_t) );
746 0 : ctx->vinyl.duplicate_accounts_batch_sz += FD_SNAPWM_DUP_META_SZ;
747 0 : ctx->vinyl.duplicate_accounts_batch_cnt +=1UL;
748 0 : return 1;
749 0 : }
750 :
751 : int
752 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( fd_snapwm_tile_t * ctx,
753 0 : fd_stem_context_t * stem ) {
754 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
755 :
756 : /* There is no fseq check in batch_fini. This is a performance
757 : optimization, which requires no other fd_stem_publish on the
758 : output link in between init and fini. */
759 :
760 0 : ulong batch_sz = ctx->vinyl.duplicate_accounts_batch_sz;
761 0 : ulong batch_cnt = ctx->vinyl.duplicate_accounts_batch_cnt;
762 0 : if( FD_UNLIKELY( batch_cnt>FD_SSPARSE_ACC_BATCH_MAX ) ) {
763 0 : FD_LOG_CRIT(( "batch_cnt %lu exceeds FD_SSPARSE_ACC_BATCH_MAX %lu", batch_cnt, FD_SSPARSE_ACC_BATCH_MAX ));
764 0 : }
765 0 : if( FD_UNLIKELY( !batch_sz ) ) return 0;
766 0 : fd_stem_publish( stem, ctx->hash_out.idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, ctx->hash_out.chunk, batch_sz, 0UL, 0UL, batch_cnt/*tspub*/ );
767 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, batch_sz, ctx->hash_out.chunk0, ctx->hash_out.wmark );
768 0 : return 1;
769 0 : }
770 :
771 : int
772 : fd_snapwm_vinyl_duplicate_accounts_lthash_init( fd_snapwm_tile_t * ctx,
773 0 : fd_stem_context_t * stem ) {
774 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
775 0 : fd_lthash_zero( &ctx->vinyl.running_lthash );
776 0 : ctx->vinyl.running_capitalization = 0UL;
777 :
778 0 : (void)stem;
779 : /* There is no fseq check in lthash_init, since append uses internal
780 : adder and running_lthash, without accessing the dcache. */
781 0 : return 1;
782 0 : }
783 :
784 : int
785 : fd_snapwm_vinyl_duplicate_accounts_lthash_append( fd_snapwm_tile_t * ctx,
786 0 : uchar * pair ) {
787 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
788 0 : streamlined_hash( ctx, &ctx->vinyl.adder, &ctx->vinyl.running_lthash, pair );
789 0 : return 1;
790 0 : }
791 :
792 : int
793 : fd_snapwm_vinyl_duplicate_accounts_lthash_fini( fd_snapwm_tile_t * ctx,
794 0 : fd_stem_context_t * stem ) {
795 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
796 :
797 : /* fseq check is mandatory here. */
798 0 : handle_hash_out_fseq_check( ctx, stem, FD_SNAPWM_DUP_LTHASH_CREDIT_MIN );
799 :
800 0 : fd_lthash_adder_flush( &ctx->vinyl.adder, &ctx->vinyl.running_lthash );
801 0 : fd_ssctrl_hash_result_t * res = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
802 0 : fd_memcpy( res->lthash.bytes, &ctx->vinyl.running_lthash, FD_LTHASH_LEN_BYTES );
803 0 : res->capitalization = (long)ctx->vinyl.running_capitalization;
804 0 : fd_stem_publish( stem, ctx->hash_out.idx, FD_SNAPSHOT_HASH_MSG_RESULT_SUB, ctx->hash_out.chunk, sizeof(fd_ssctrl_hash_result_t), 0UL, 0UL, 0UL );
805 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sizeof(fd_ssctrl_hash_result_t), ctx->hash_out.chunk0, ctx->hash_out.wmark );
806 0 : return 1;
807 0 : }
808 :
809 : int
810 : fd_snapwm_vinyl_init_admin( fd_snapwm_tile_t * ctx,
811 0 : int do_rwlock ) {
812 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_write( &ctx->vinyl.admin->lock );
813 :
814 0 : ulong status = fd_vinyl_admin_ulong_query( &ctx->vinyl.admin->status );
815 0 : if( FD_UNLIKELY( status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING ) ) {
816 0 : FD_LOG_WARNING(( "vinyl admin unexpected status %s (%lu) during initialization",
817 0 : fd_vinyl_admin_status_str( status ), status ));
818 0 : goto init_admin_error;
819 0 : }
820 :
821 0 : if( FD_UNLIKELY( !ctx->vinyl.wr_cnt ) ) {
822 0 : FD_LOG_WARNING(( "vinyl admin sees unexpected write tile count %lu", ctx->vinyl.wr_cnt ));
823 0 : goto init_admin_error;
824 0 : }
825 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->wr_cnt, ctx->vinyl.wr_cnt );
826 :
827 0 : for( ulong i=0UL; i<ctx->vinyl.wr_cnt; i++ ) {
828 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->wr_seq[ i ], 0UL );
829 0 : }
830 :
831 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->status, FD_VINYL_ADMIN_STATUS_INIT_DONE );
832 :
833 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_unwrite( &ctx->vinyl.admin->lock );
834 0 : return 1;
835 :
836 0 : init_admin_error:
837 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_unwrite( &ctx->vinyl.admin->lock );
838 0 : return 0;
839 0 : }
840 :
841 : int
842 : fd_snapwm_vinyl_update_admin( fd_snapwm_tile_t * ctx,
843 0 : int do_rwlock ) {
844 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_write( &ctx->vinyl.admin->lock );
845 :
846 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->status, FD_VINYL_ADMIN_STATUS_UPDATING );
847 :
848 0 : for( ulong i=0UL; i<ctx->vinyl.wr_cnt; i++ ) {
849 : /* This may cause a wr_seq[ i ] regression, which is expected e.g.
850 : if the snapshot load pipeline aborts the current snapshot and
851 : resets to load a new one. */
852 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->wr_seq[ i ], ctx->vinyl.io_wd->seq_present );
853 0 : }
854 :
855 0 : ulong status = fd_ulong_if( ctx->full, FD_VINYL_ADMIN_STATUS_SNAPSHOT_FULL, FD_VINYL_ADMIN_STATUS_SNAPSHOT_INCR );
856 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->status, status );
857 :
858 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_unwrite( &ctx->vinyl.admin->lock );
859 0 : return 1;
860 0 : }
861 :
862 : void
863 0 : fd_snapwm_vinyl_recovery_seq_backup( fd_snapwm_tile_t * ctx ) {
864 0 : ctx->vinyl.recovery.seq_ancient = ctx->vinyl.io_mm->seq_ancient;
865 0 : ctx->vinyl.recovery.seq_past = ctx->vinyl.io_mm->seq_past;
866 0 : ctx->vinyl.recovery.seq_present = ctx->vinyl.io_mm->seq_present;
867 0 : ctx->vinyl.recovery.seq_future = ctx->vinyl.io_mm->seq_future;
868 0 : }
869 :
870 : void
871 0 : fd_snapwm_vinyl_recovery_seq_apply( fd_snapwm_tile_t * ctx ) {
872 0 : ctx->vinyl.io_mm->seq_ancient = ctx->vinyl.recovery.seq_ancient;
873 0 : ctx->vinyl.io_mm->seq_past = ctx->vinyl.recovery.seq_past;
874 0 : ctx->vinyl.io_mm->seq_present = ctx->vinyl.recovery.seq_present;
875 0 : ctx->vinyl.io_mm->seq_future = ctx->vinyl.recovery.seq_future;
876 0 : }
877 :
878 : void
879 0 : fd_snapwm_vinyl_revert_full( fd_snapwm_tile_t * ctx ) {
880 0 : fd_vinyl_meta_t * map = ctx->vinyl.map;
881 0 : fd_vinyl_meta_ele_t * ele0 = map->ele;
882 0 : ulong ele_max = map->ele_max;
883 0 : void * map_ctx = map->ctx;
884 :
885 0 : long dt = -fd_log_wallclock();
886 0 : for( ulong ele_idx=0; ele_idx<ele_max; ele_idx++ ) {
887 0 : fd_vinyl_meta_ele_t * ele = ele0 + ele_idx;
888 0 : fd_vinyl_meta_private_ele_free( map_ctx, ele );
889 0 : }
890 :
891 : /* Apply changes and resync */
892 0 : fd_snapwm_vinyl_recovery_seq_apply( ctx );
893 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
894 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
895 0 : vinyl_mm_sync( ctx );
896 :
897 0 : dt += fd_log_wallclock();
898 0 : FD_LOG_INFO(( "vinyl revert_full took %g seconds", (double)dt/1e9 ));
899 0 : }
900 :
901 : void
902 0 : fd_snapwm_vinyl_revert_incr( fd_snapwm_tile_t * ctx ) {
903 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_commit called while not in txn" );
904 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
905 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
906 :
907 0 : long dt = -fd_log_wallclock();
908 :
909 : /* Finish any outstanding writes */
910 :
911 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
912 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
913 :
914 : /* Hint to kernel to start prefetching to speed up reads */
915 :
916 0 : uchar * mmio = fd_vinyl_mmio ( io ); FD_TEST( mmio );
917 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
918 :
919 0 : ulong txn_seq0 = ctx->vinyl.recovery.seq_present;
920 0 : ulong txn_seq1 = ctx->vinyl.io_mm->seq_present;
921 0 : FD_LOG_INFO(( "vinyl meta_recovery starting for seq [%lu,%lu)", txn_seq0, txn_seq1 ));
922 0 : ulong txn_sz = txn_seq1-txn_seq0;
923 0 : FD_CRIT( fd_vinyl_seq_le( txn_seq0, txn_seq1 ), "invalid txn seq range" );
924 0 : FD_CRIT( txn_seq1 <= mmio_sz, "invalid txn seq range" );
925 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( txn_seq0, txn_seq1 ) ) ) return;
926 :
927 0 : void * madv_base = (void *)fd_ulong_align_dn( (ulong)mmio+txn_seq0, FD_SHMEM_NORMAL_PAGE_SZ );
928 0 : ulong madv_sz = /* */fd_ulong_align_up( txn_sz, FD_SHMEM_NORMAL_PAGE_SZ );
929 0 : if( FD_UNLIKELY( madvise( madv_base, madv_sz, MADV_SEQUENTIAL ) ) ) {
930 0 : FD_LOG_WARNING(( "madvise(addr=%p,sz=%lu,MADV_SEQUENTIAL) failed (%i-%s)",
931 0 : madv_base, madv_sz,
932 0 : errno, fd_io_strerror( errno ) ));
933 0 : }
934 :
935 0 : fd_vinyl_meta_t * meta_map = ctx->vinyl.map;
936 0 : for( ulong seq=txn_seq0; fd_vinyl_seq_lt( seq, txn_seq1 ); ) {
937 :
938 0 : fd_vinyl_bstream_block_t * incr_block = (void *)( mmio+seq );
939 :
940 : /* Speculatively read block info */
941 0 : ulong ctl = FD_VOLATILE_CONST( incr_block->ctl );
942 0 : fd_vinyl_bstream_phdr_t incr_phdr = FD_VOLATILE_CONST( incr_block->phdr );
943 :
944 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
945 0 : int block_type = fd_vinyl_bstream_ctl_type( ctl );
946 0 : ulong block_sz;
947 :
948 0 : if( FD_LIKELY( block_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
949 0 : block_sz = fd_vinyl_bstream_pair_sz( val_esz );
950 0 : ulong memo = fd_vinyl_key_memo( meta_map->seed, &incr_phdr.key );
951 :
952 : /* recovery_seq must be read from the bstream pair, and not from
953 : the meta map ele, because ele->hdr.info and phdr.info may
954 : start disagreeing on this value as the recovery proceeds.
955 : Consider what happens when there are multiple duplicates for
956 : the same account in the incremental snapshot. */
957 0 : ulong recovery_seq = fd_snapin_vinyl_pair_info_recovery_seq( &incr_phdr.info );
958 :
959 : /* query the meta map element. */
960 0 : ulong found_ele_idx = 0UL;
961 0 : int found_ele = !fd_vinyl_meta_query_fast( meta_map->ele /*ele0*/,
962 0 : meta_map->ele_max,
963 0 : &incr_phdr.key,
964 0 : memo,
965 0 : &found_ele_idx );
966 :
967 : /* Consider these two generic cases, labeled A and B:
968 :
969 : bstream: [ full | incr | free )
970 : revert: (*)->.......)
971 : case A : [ A0 | A1 A2 | )
972 : case B : [ | B1 B2 | )
973 :
974 : with these pair -> recovery_seq:
975 : A0 -> 0 (sentinel)
976 : A1 -> A0
977 : A2 -> A1
978 :
979 : B1 -> 0 (sentinel)
980 : B2 -> B1
981 :
982 : Cases A1 and B1 have a recovery_seq in the full snapshot range,
983 : and are processed below in the "if" branch. Cases A2 and B2
984 : have a recovery_seq in the incr range, and are processed in the
985 : "else" branch. In these 4 cases, the corresponding bstream
986 : pair will be cleared.
987 :
988 : Note that bstream pairs are read/processed from left to right,
989 : i.e. A1 then A2, or B1 then B2.
990 :
991 : Case A1: the meta map element needs to be updated with bstream
992 : seq A0.
993 :
994 : Case B1: this account (bstream pair) was introduced during incr
995 : snapshot load, and should be discarded. Its meta map
996 : element is therefore freed.
997 :
998 : Case A2: its recovery_seq in the bstream pair's info points to
999 : A1, but the meta map element has already been updated
1000 : to A0. In this case, the meta map element exists,
1001 : and it is necessary to verify that the meta map
1002 : element's seq points to a bstream seq in the full
1003 : snapshot range. A2 is then discarded.
1004 :
1005 : Case B2: its recovery_seq in the bstream pair's info points to
1006 : B1, but the meta map element has already been freed.
1007 : In this case there is nothing else to do.
1008 : */
1009 0 : if( FD_LIKELY( recovery_seq<ctx->vinyl.recovery.seq_present ) ) {
1010 : /* The meta map element must exist. */
1011 0 : if( FD_UNLIKELY( !found_ele ) ) {
1012 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
1013 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx not found", seq, phdr_key_b58, memo ));
1014 0 : }
1015 :
1016 0 : fd_vinyl_meta_ele_t * ele = meta_map->ele + found_ele_idx;
1017 :
1018 : /* The meta map element must be in use. */
1019 0 : if( !fd_vinyl_meta_ele_in_use( ele ) ) {
1020 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
1021 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx not in use", seq, phdr_key_b58, memo ));
1022 0 : }
1023 :
1024 : /* Either free the meta map element or update it. */
1025 0 : if( FD_UNLIKELY( !recovery_seq ) ) {
1026 0 : fd_vinyl_meta_private_ele_free( meta_map->ctx, ele );
1027 0 : } else {
1028 0 : fd_vinyl_bstream_block_t * full_block = (void *)( mmio+recovery_seq );
1029 0 : fd_vinyl_bstream_phdr_t full_phdr = FD_VOLATILE_CONST( full_block->phdr );
1030 0 : ulong incr_slot = fd_snapin_vinyl_pair_info_slot( &incr_phdr.info );
1031 0 : ulong full_slot = fd_snapin_vinyl_pair_info_slot( &full_phdr.info );
1032 :
1033 0 : if( FD_UNLIKELY( full_slot>=incr_slot ) ) {
1034 0 : FD_LOG_CRIT(( "revert incremental snapshot full_slot %lu >= incr_slot %lu", full_slot, incr_slot ));
1035 0 : }
1036 :
1037 : /* Update meta map element. */
1038 0 : ele->memo = fd_vinyl_key_memo( meta_map->seed, &full_phdr.key );
1039 0 : ele->phdr = full_phdr;
1040 0 : ele->seq = recovery_seq;
1041 0 : ele->line_idx = ULONG_MAX;
1042 0 : }
1043 0 : } else{
1044 : /* Only if the meta map element exists, verify that its
1045 : recovery_seq points to a seq inside the full snapshot range. */
1046 0 : if( FD_UNLIKELY( found_ele ) ) {
1047 0 : fd_vinyl_meta_ele_t * ele = meta_map->ele + found_ele_idx;
1048 0 : if( !fd_vinyl_meta_ele_in_use( ele ) ) {
1049 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
1050 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx not in use", seq, phdr_key_b58, memo ));
1051 0 : }
1052 0 : ulong ele_recovery_seq = fd_snapin_vinyl_pair_info_recovery_seq( &ele->phdr.info );
1053 :
1054 0 : if( FD_UNLIKELY( ele_recovery_seq>=ctx->vinyl.recovery.seq_present ) ) {
1055 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
1056 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx recovery_seq %lu with ele_recovery_seq %lu in the incr region", seq, phdr_key_b58, memo, recovery_seq, ele_recovery_seq ));
1057 0 : }
1058 0 : }
1059 0 : }
1060 : /* Verbose bstream cleanup. Even though the sync block will be
1061 : updated as well before returning, it is preferred to zero out
1062 : all incremental pairs that are being deprecated. */
1063 0 : fd_memset( incr_block, 0, block_sz );
1064 0 : } else if( block_type==FD_VINYL_BSTREAM_CTL_TYPE_ZPAD ) {
1065 0 : block_sz = FD_VINYL_BSTREAM_BLOCK_SZ;
1066 0 : } else {
1067 0 : FD_LOG_CRIT(( "unexpected block type %d", block_type ));
1068 0 : }
1069 :
1070 0 : if( FD_UNLIKELY( !block_sz ) ) {
1071 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx (zero block_sz)", seq, ctl ));
1072 0 : }
1073 0 : if( FD_UNLIKELY( block_sz > 64UL<<20 ) ) {
1074 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx, block_sz=%lu (unreasonably large block size)", seq, ctl, block_sz ));
1075 0 : }
1076 :
1077 0 : seq += block_sz;
1078 0 : }
1079 :
1080 : /* Apply changes and resync */
1081 0 : fd_snapwm_vinyl_recovery_seq_apply( ctx );
1082 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
1083 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
1084 0 : vinyl_mm_sync( ctx );
1085 :
1086 0 : dt += fd_log_wallclock();
1087 0 : FD_LOG_INFO(( "vinyl revert_incr took %g seconds", (double)dt/1e9 ));
1088 0 : }
1089 :
1090 : void
1091 : fd_snapin_vinyl_pair_info_from_parts( fd_vinyl_info_t * info,
1092 : ulong val_sz,
1093 : ulong recovery_seq,
1094 0 : ulong slot ) {
1095 0 : ulong enc_seq = recovery_seq >> FD_VINYL_BSTREAM_BLOCK_LG_SZ;
1096 0 : ulong ul0 = ( ( enc_seq<<32 ) ) | ( ( val_sz<<32 )>>32);
1097 0 : ulong ul1 = ( ( enc_seq>>32 )<<48 ) | ( ( slot<<16 )>>16);
1098 0 : info->ul[ 0 ] = ul0;
1099 0 : info->ul[ 1 ] = ul1;
1100 0 : }
1101 :
1102 : void
1103 : fd_snapin_vinyl_pair_info_update_recovery_seq( fd_vinyl_info_t * info,
1104 0 : ulong recovery_seq ) {
1105 0 : fd_snapin_vinyl_pair_info_from_parts( info,
1106 0 : fd_snapin_vinyl_pair_info_val_sz( info ),
1107 0 : recovery_seq,
1108 0 : fd_snapin_vinyl_pair_info_slot( info ) );
1109 0 : }
1110 :
1111 : ulong
1112 0 : fd_snapin_vinyl_pair_info_val_sz ( fd_vinyl_info_t const * info ) {
1113 0 : return (ulong)info->ui[0];
1114 0 : }
1115 :
1116 : ulong
1117 0 : fd_snapin_vinyl_pair_info_recovery_seq( fd_vinyl_info_t const * info ) {
1118 0 : ulong enc_seq0 = info->ul[ 0 ];
1119 0 : ulong enc_seq1 = info->ul[ 1 ];
1120 0 : ulong enc_seq = ( ( enc_seq1>>48 )<<32 ) | ( enc_seq0>>32 );
1121 0 : ulong recovery_seq = enc_seq << FD_VINYL_BSTREAM_BLOCK_LG_SZ;
1122 0 : return recovery_seq;
1123 0 : }
1124 :
1125 : ulong
1126 0 : fd_snapin_vinyl_pair_info_slot( fd_vinyl_info_t const * info ) {
1127 0 : ulong slot = info->ul[ 1 ];
1128 0 : slot = ( slot<<16 )>>16;
1129 0 : return slot;
1130 0 : }
|