Line data Source code
1 : #include "fd_snaplv_tile_private.h"
2 : #include "../../disco/topo/fd_topo.h"
3 : #include "../../disco/metrics/fd_metrics.h"
4 : #include "../../ballet/lthash/fd_lthash.h"
5 : #include "../../util/pod/fd_pod.h"
6 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
7 : #include "generated/fd_snaplv_tile_seccomp.h"
8 :
9 : #include "utils/fd_ssctrl.h"
10 : #include "utils/fd_vinyl_admin.h"
11 :
12 : #define NAME "snaplv"
13 :
14 0 : #define IN_KIND_SNAPWM (0)
15 0 : #define IN_KIND_SNAPLH (1)
16 : #define MAX_IN_LINKS (16UL) /* at least 1 snapwm and FD_SNAPSHOT_MAX_SNAPLH_TILES */
17 :
18 : #define OUT_LINK_CNT (3UL)
19 0 : #define OUT_LINK_LH (0)
20 0 : #define OUT_LINK_CT (1)
21 : #define OUT_LINK_WR (2)
22 :
23 : struct out_link {
24 : ulong idx;
25 : fd_wksp_t * mem;
26 : ulong chunk0;
27 : ulong wmark;
28 : ulong chunk;
29 : };
30 : typedef struct out_link out_link_t;
31 :
32 : struct fd_snaplv_tile {
33 : uint state;
34 : int full;
35 :
36 : ulong num_hash_tiles;
37 : ulong num_write_tiles;
38 :
39 : uchar in_kind[MAX_IN_LINKS];
40 : ulong adder_in_offset;
41 :
42 : out_link_t out_link[OUT_LINK_CNT];
43 :
44 : long running_capitalization;
45 : long dup_capitalization;
46 : ulong manifest_capitalization;
47 :
48 : struct {
49 : ulong bstream_seq_last;
50 : struct {
51 : int active[FD_SNAPLV_DUP_PENDING_CNT_MAX];
52 : ulong seq [FD_SNAPLV_DUP_PENDING_CNT_MAX];
53 : fd_vinyl_bstream_phdr_t phdr [FD_SNAPLV_DUP_PENDING_CNT_MAX];
54 : } pending;
55 : ulong pending_cnt;
56 : fd_vinyl_admin_t * admin;
57 : } vinyl;
58 :
59 : struct {
60 : fd_lthash_value_t expected_lthash;
61 : fd_lthash_value_t calculated_lthash;
62 : ulong received_lthashes;
63 : ulong ack_sig;
64 : int awaiting_results;
65 : int hash_check_done;
66 : } hash_accum;
67 :
68 : struct {
69 : ulong exp_sig;
70 : ulong ack_cnt;
71 : int wait;
72 : } fail;
73 :
74 : struct {
75 : fd_lthash_value_t full_lthash;
76 : long capitalization;
77 : } recovery;
78 :
79 : struct {
80 : struct {
81 : ulong duplicate_accounts_hashed;
82 : } full;
83 :
84 : struct {
85 : ulong duplicate_accounts_hashed;
86 : } incremental;
87 : } metrics;
88 :
89 : struct {
90 : fd_wksp_t * wksp;
91 : ulong chunk0;
92 : ulong wmark;
93 : ulong mtu;
94 : ulong pos;
95 : } in;
96 :
97 : struct {
98 : fd_wksp_t * wksp;
99 : ulong chunk0;
100 : ulong wmark;
101 : ulong mtu;
102 : } adder_in[FD_SNAPSHOT_MAX_SNAPLH_TILES];
103 : };
104 :
105 : typedef struct fd_snaplv_tile fd_snaplv_t;
106 :
107 : static inline int
108 0 : should_shutdown( fd_snaplv_t * ctx ) {
109 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
110 0 : }
111 :
112 : static ulong
113 0 : scratch_align( void ) {
114 0 : return alignof(fd_snaplv_t);
115 0 : }
116 :
117 : static ulong
118 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
119 0 : (void)tile;
120 0 : ulong l = FD_LAYOUT_INIT;
121 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
122 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaplv_t) );
123 0 : }
124 :
125 : static void
126 0 : metrics_write( fd_snaplv_t * ctx ) {
127 0 : (void)ctx;
128 0 : FD_MGAUGE_SET( SNAPLV, FULL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.full.duplicate_accounts_hashed );
129 0 : FD_MGAUGE_SET( SNAPLV, INCREMENTAL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.incremental.duplicate_accounts_hashed );
130 0 : FD_MGAUGE_SET( SNAPLV, STATE, (ulong)(ctx->state) );
131 0 : }
132 :
133 : static void
134 : transition_malformed( fd_snaplv_t * ctx,
135 0 : fd_stem_context_t * stem ) {
136 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
137 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
138 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
139 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
140 0 : }
141 :
142 : static void
143 : handle_vinyl_lthash_request( fd_snaplv_t * ctx,
144 : fd_stem_context_t * stem,
145 : ulong seq,
146 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
147 :
148 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
149 0 : uchar * data = fd_chunk_to_laddr( o_link->mem, o_link->chunk );
150 0 : memcpy( data, &seq, sizeof(ulong) );
151 0 : memcpy( data + sizeof(ulong), acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
152 0 : ulong data_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
153 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, o_link->chunk, data_sz, 0UL, 0UL, 0UL );
154 0 : o_link->chunk = fd_dcache_compact_next( o_link->chunk, data_sz, o_link->chunk0, o_link->wmark );
155 :
156 0 : if( ctx->full ) ctx->metrics.full.duplicate_accounts_hashed++;
157 0 : else ctx->metrics.incremental.duplicate_accounts_hashed++;
158 0 : }
159 :
160 : static inline void
161 0 : handle_vinyl_lthash_seq_sync( fd_snaplv_t * ctx ) {
162 0 : ulong bstream_seq_min = ULONG_MAX;
163 0 : for( ulong i=0; i<ctx->num_write_tiles; i++ ) {
164 : /* There is a way to avoid a lock here: every wr_seq[i] is a ulong,
165 : each assigned to a unique write tile, and it works the same way
166 : as a stem's fseq or an mcache's seq. Therefore, from the point
167 : of view snaplv, it can directly read them at any time.
168 : Only snapwm is allowed to overwrite the wr_seq array during the
169 : initialization of a full/incr snapshot, but it does so after
170 : synchronizing with the write tiles (making sure that they have
171 : already completed all pending writes) and before instructing
172 : snaplv to start processing the snapshot. */
173 0 : ulong bstream_seq = fd_vinyl_admin_ulong_query( &ctx->vinyl.admin->wr_seq[ i ] );
174 0 : bstream_seq_min = fd_ulong_min( bstream_seq_min, bstream_seq );
175 0 : }
176 0 : ctx->vinyl.bstream_seq_last = bstream_seq_min;
177 0 : }
178 :
179 : static inline int
180 : handle_vinyl_lthash_seq_check_fast( fd_snaplv_t * ctx,
181 0 : ulong seq ) {
182 0 : return seq < ctx->vinyl.bstream_seq_last;
183 0 : }
184 :
185 : static inline void
186 : handle_vinyl_lthash_seq_check_until_match( fd_snaplv_t * ctx,
187 : ulong seq,
188 0 : int do_sleep ) {
189 0 : for(;;) {
190 0 : if( handle_vinyl_lthash_seq_check_fast( ctx, seq ) ) break;
191 0 : FD_SPIN_PAUSE();
192 0 : if( do_sleep ) fd_log_sleep( (long)1e3 ); /* 1 microsecond */
193 0 : handle_vinyl_lthash_seq_sync( ctx );
194 0 : }
195 0 : }
196 :
197 : static inline void
198 : handle_vinyl_lthash_request_drain_all( fd_snaplv_t * ctx,
199 0 : fd_stem_context_t * stem ) {
200 0 : for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
201 0 : if( !ctx->vinyl.pending.active[ i ] ) continue;
202 0 : handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ i ], 1/*do_sleep*/ );
203 0 : handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
204 0 : ctx->vinyl.pending.active[ i ] = 0;
205 0 : ctx->vinyl.pending_cnt--;
206 0 : }
207 0 : FD_TEST( !ctx->vinyl.pending_cnt );
208 0 : }
209 :
210 : static inline void
211 : handle_vinyl_lthash_pending_list( fd_snaplv_t * ctx,
212 0 : fd_stem_context_t * stem ) {
213 : /* Try to consume as many pending requests as possible. */
214 0 : for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
215 0 : if( FD_LIKELY( !ctx->vinyl.pending.active[ i ] ) ) continue;
216 0 : if( handle_vinyl_lthash_seq_check_fast( ctx, ctx->vinyl.pending.seq[ i ] ) ) {
217 0 : handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
218 0 : ctx->vinyl.pending.active[ i ] = 0;
219 0 : ctx->vinyl.pending_cnt--;
220 0 : }
221 0 : }
222 0 : }
223 :
224 : static void
225 : handle_data_frag( fd_snaplv_t * ctx,
226 : fd_stem_context_t * stem,
227 : ulong sig,
228 : ulong chunk,
229 : ulong sz,
230 0 : ulong tspub ) {
231 0 : (void)chunk; (void)sz;
232 :
233 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
234 : /* skip all data frags when in error state. */
235 0 : return;
236 0 : }
237 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
238 0 : FD_LOG_ERR(( "received data frag %s (%lu) while in unexpected state %s (%u)",
239 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
240 0 : fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
241 0 : return;
242 0 : }
243 0 : if( FD_UNLIKELY( sig!=FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) {
244 0 : FD_LOG_ERR(( "received incorrect data frag %s (%lu) in state %s (%u)",
245 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
246 0 : fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
247 0 : return;
248 0 : }
249 :
250 0 : uchar const * in_data = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
251 :
252 0 : ulong const acc_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
253 0 : ulong const batch_sz = sz;
254 0 : ulong const batch_cnt = tspub;
255 0 : if( FD_UNLIKELY( batch_cnt>FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ) ) {
256 0 : FD_LOG_CRIT(( "batch count %lu exceeds batch count max %lu", batch_cnt, FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ));
257 0 : }
258 0 : if( FD_UNLIKELY( (batch_cnt*acc_sz)!=batch_sz ) ) {
259 0 : FD_LOG_CRIT(( "batch count %lu with account size %lu does not match batch size %lu", batch_cnt, acc_sz, batch_sz ));
260 0 : }
261 :
262 0 : for( ulong acc_i=0UL; acc_i<batch_cnt; acc_i++ ) {
263 :
264 : /* move in_data pointer forward */
265 0 : uchar const * acc_data = in_data;
266 0 : in_data += acc_sz;
267 :
268 0 : ulong acc_data_seq = fd_ulong_load_8( acc_data );
269 :
270 0 : if( FD_LIKELY( handle_vinyl_lthash_seq_check_fast( ctx, acc_data_seq ) ) ) {
271 : /* The request can be processed immediately, skipping the pending list. */
272 0 : fd_vinyl_bstream_phdr_t acc_data_phdr[1];
273 0 : memcpy( acc_data_phdr, acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
274 0 : handle_vinyl_lthash_request( ctx, stem, acc_data_seq, acc_data_phdr );
275 0 : continue;
276 0 : }
277 :
278 : /* Find an empty slot in the pending list. */
279 0 : ulong seq_min_i = ULONG_MAX;
280 0 : ulong seq_min = ULONG_MAX;
281 0 : ulong free_i = ULONG_MAX;
282 0 : if( FD_UNLIKELY( ctx->vinyl.pending_cnt==FD_SNAPLV_DUP_PENDING_CNT_MAX ) ) {
283 : /* an entry must be consumed to free a slot */
284 0 : for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
285 0 : ulong seq = ctx->vinyl.pending.seq[ i ];
286 0 : seq_min_i = fd_ulong_if( seq_min > seq, i, seq_min_i );
287 0 : seq_min = fd_ulong_min( seq_min, seq );
288 0 : }
289 0 : handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ seq_min_i ], 1/*do_sleep*/ );
290 0 : handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ seq_min_i ], &ctx->vinyl.pending.phdr[ seq_min_i ] );
291 0 : ctx->vinyl.pending.active[ seq_min_i ] = 0;
292 0 : ctx->vinyl.pending_cnt--;
293 0 : free_i = seq_min_i;
294 0 : } else {
295 : /* Pick a free slot. */
296 0 : free_i = 0UL;
297 0 : for( ; free_i<FD_SNAPLV_DUP_PENDING_CNT_MAX; free_i++ ) {
298 0 : if( !ctx->vinyl.pending.active[ free_i ] ) break;
299 0 : }
300 0 : }
301 :
302 : /* Populate the free slot. */
303 0 : ctx->vinyl.pending.seq[ free_i ] = acc_data_seq;
304 0 : memcpy( &ctx->vinyl.pending.phdr[ free_i ], acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
305 0 : ctx->vinyl.pending.active[ free_i ] = 1;
306 0 : ctx->vinyl.pending_cnt++;
307 0 : }
308 0 : }
309 :
310 : static void
311 : handle_control_frag( fd_snaplv_t * ctx,
312 : fd_stem_context_t * stem,
313 : ulong sig,
314 : ulong in_idx,
315 : ulong tsorig,
316 0 : ulong tspub ) {
317 0 : (void)in_idx;
318 :
319 0 : if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH ) {
320 0 : if( FD_UNLIKELY( !ctx->fail.wait ) ) {
321 0 : FD_LOG_CRIT(( "received unexpected control frag %s (%lu) from snaplh in state %s (%u)",
322 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
323 0 : fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
324 0 : }
325 0 : if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) ) {
326 0 : FD_LOG_CRIT(( "received incorrect control frag %s (%lu) from snaplh in state %s (%u)",
327 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
328 0 : fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
329 0 : }
330 0 : ctx->fail.ack_cnt++;
331 0 : return;
332 0 : }
333 :
334 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
335 : /* Control messages move along the snapshot load pipeline. Since
336 : error conditions can be triggered by any tile in the pipeline,
337 : it is possible to be in error state and still receive otherwise
338 : valid messages. Only a fail message can revert this. */
339 0 : return;
340 0 : };
341 :
342 0 : int forward_to_ct = 1;
343 :
344 0 : switch( sig ) {
345 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
346 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
347 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
348 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
349 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
350 :
351 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
352 0 : fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
353 0 : fd_lthash_zero( &ctx->recovery.full_lthash );
354 0 : ctx->running_capitalization = 0L;
355 0 : ctx->dup_capitalization = 0L;
356 0 : } else {
357 : /* The lthash for the incremental snapshot is computed starting
358 : from the full snapshot lthash. Since an init message may
359 : be received after a fail message, always start from the
360 : recovery value. */
361 0 : ctx->hash_accum.calculated_lthash = ctx->recovery.full_lthash;
362 0 : ctx->running_capitalization = ctx->recovery.capitalization;
363 0 : ctx->dup_capitalization = 0L;
364 0 : }
365 :
366 0 : break;
367 0 : }
368 :
369 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
370 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
371 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
372 0 : ctx->hash_accum.ack_sig = sig;
373 0 : ctx->hash_accum.awaiting_results = 1;
374 0 : handle_vinyl_lthash_request_drain_all( ctx, stem );
375 0 : forward_to_ct = 0;
376 0 : break; /* the ack is sent when all hashes are received */
377 0 : }
378 :
379 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
380 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
381 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
382 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
383 :
384 : /* back up full_lthash for future recovery. */
385 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_NEXT ) {
386 0 : ctx->recovery.full_lthash = ctx->hash_accum.calculated_lthash;
387 0 : ctx->recovery.capitalization = ctx->running_capitalization;
388 0 : }
389 0 : break;
390 0 : }
391 :
392 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
393 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
394 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
395 0 : break;
396 0 : }
397 :
398 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
399 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
400 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
401 0 : ctx->fail.exp_sig = FD_SNAPSHOT_MSG_CTRL_FAIL;
402 0 : ctx->fail.ack_cnt = 0UL;
403 0 : ctx->fail.wait = 1;
404 0 : forward_to_ct = 0;
405 0 : break;
406 0 : }
407 :
408 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
409 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
410 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
411 0 : break;
412 0 : }
413 :
414 0 : default: {
415 0 : FD_LOG_ERR(( "unexpected control frag %s (%lu) in state %s (%u)",
416 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
417 0 : fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
418 0 : break;
419 0 : }
420 0 : }
421 :
422 : /* Forward the control message down the pipeline */
423 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
424 0 : if( !forward_to_ct ) return;
425 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
426 0 : }
427 :
428 : static void
429 : handle_hash_frag( fd_snaplv_t * ctx,
430 : fd_stem_context_t * stem,
431 : ulong in_idx,
432 : ulong sig,
433 : ulong chunk,
434 0 : ulong sz ) {
435 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
436 : /* skip all hash frags when in error state. */
437 0 : return;
438 0 : }
439 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING &&
440 0 : ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
441 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM) ) {
442 0 : FD_LOG_WARNING(( "received invalid data frag %s (%lu) from snapwm in state %s (%u)",
443 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
444 0 : fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
445 0 : transition_malformed( ctx, stem );
446 0 : }
447 0 : return;
448 0 : }
449 0 : switch( sig ) {
450 0 : case FD_SNAPSHOT_HASH_MSG_RESULT_ADD: {
451 0 : FD_TEST( sz==sizeof(fd_ssctrl_hash_result_t) );
452 0 : FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH );
453 0 : fd_ssctrl_hash_result_t const * result = fd_chunk_to_laddr_const( ctx->adder_in[ in_idx-ctx->adder_in_offset ].wksp, chunk );
454 0 : fd_lthash_add( &ctx->hash_accum.calculated_lthash, &result->lthash );
455 0 : ctx->running_capitalization = fd_long_sat_add( ctx->running_capitalization, result->capitalization );
456 0 : if( FD_UNLIKELY( ctx->running_capitalization==LONG_MAX ) ) {
457 0 : FD_LOG_WARNING(( "capitalization overflow, running_capitalization=%ld, result_capitalization=%ld", ctx->running_capitalization, result->capitalization ));
458 0 : transition_malformed( ctx, stem );
459 0 : return;
460 0 : }
461 0 : ctx->hash_accum.received_lthashes++;
462 0 : break;
463 0 : }
464 0 : case FD_SNAPSHOT_HASH_MSG_RESULT_SUB: {
465 0 : FD_TEST( sz==sizeof(fd_ssctrl_hash_result_t) );
466 0 : FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
467 0 : fd_ssctrl_hash_result_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
468 0 : fd_lthash_sub( &ctx->hash_accum.calculated_lthash, &result->lthash );
469 0 : FD_TEST( result->capitalization>=0L );
470 0 : ctx->dup_capitalization = fd_long_sat_add( ctx->dup_capitalization, result->capitalization );
471 0 : break;
472 0 : }
473 0 : case FD_SNAPSHOT_HASH_MSG_EXPECTED: {
474 0 : FD_TEST( sz==sizeof(fd_lthash_value_t) );
475 0 : FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
476 0 : fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
477 0 : ctx->hash_accum.expected_lthash = *result;
478 0 : break;
479 0 : }
480 0 : default: {
481 0 : FD_LOG_ERR(( "unexpected hash frag %s (%lu) in state %s (%lu)",
482 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
483 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
484 0 : break;
485 0 : }
486 0 : }
487 0 : }
488 :
489 : static inline void
490 : handle_expected_capitalization_message( fd_snaplv_t * ctx,
491 : ulong chunk,
492 0 : ulong sz ) {
493 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
494 : /* skip all hash frags when in error state. */
495 0 : return;
496 0 : }
497 :
498 0 : if( FD_UNLIKELY( sz!=sizeof(fd_ssctrl_capitalization_t) ) ) {
499 0 : FD_LOG_ERR(( "unexpected msg sz %lu for sig FD_SNAPSHOT_MSG_EXP_CAPITALIZATION", sz ));
500 0 : return;
501 0 : }
502 :
503 0 : fd_ssctrl_capitalization_t const * expected_cap = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
504 0 : ctx->manifest_capitalization = expected_cap->capitalization;
505 0 : }
506 :
507 : static inline int
508 : returnable_frag( fd_snaplv_t * ctx,
509 : ulong in_idx,
510 : ulong seq FD_PARAM_UNUSED,
511 : ulong sig,
512 : ulong chunk,
513 : ulong sz,
514 : ulong ctl FD_PARAM_UNUSED,
515 : ulong tsorig,
516 : ulong tspub,
517 0 : fd_stem_context_t * stem ) {
518 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
519 :
520 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_data_frag( ctx, stem, sig, chunk, sz, tspub );
521 0 : else if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_RESULT_ADD ||
522 0 : sig==FD_SNAPSHOT_HASH_MSG_RESULT_SUB ||
523 0 : sig==FD_SNAPSHOT_HASH_MSG_EXPECTED ) ) handle_hash_frag( ctx, stem, in_idx, sig, chunk, sz );
524 0 : else if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_EXP_CAPITALIZATION ) ) handle_expected_capitalization_message( ctx, chunk, sz );
525 0 : else handle_control_frag( ctx, stem, sig, in_idx, tsorig, tspub );
526 :
527 0 : return 0;
528 0 : }
529 :
530 : static void
531 : before_credit( fd_snaplv_t * ctx,
532 : fd_stem_context_t * stem FD_PARAM_UNUSED,
533 0 : int * charge_busy ) {
534 0 : *charge_busy = 0;
535 0 : handle_vinyl_lthash_seq_sync( ctx );
536 0 : }
537 :
538 : static void
539 : after_credit( fd_snaplv_t * ctx,
540 : fd_stem_context_t * stem,
541 : int * opt_poll_in FD_PARAM_UNUSED,
542 0 : int * charge_busy FD_PARAM_UNUSED ) {
543 :
544 0 : handle_vinyl_lthash_pending_list( ctx, stem );
545 :
546 0 : if( FD_UNLIKELY( ctx->hash_accum.awaiting_results && ctx->hash_accum.received_lthashes==ctx->num_hash_tiles ) ) {
547 :
548 0 : ctx->hash_accum.awaiting_results = 0;
549 0 : ctx->hash_accum.received_lthashes = 0UL;
550 0 : ctx->running_capitalization = fd_long_sat_sub( ctx->running_capitalization, ctx->dup_capitalization );
551 0 : if( FD_UNLIKELY( ctx->running_capitalization==LONG_MIN ) ) {
552 0 : FD_LOG_WARNING(( "capitalization underflow, running_capitalization=%ld, dup_capitalization=%ld", ctx->running_capitalization, ctx->dup_capitalization ));
553 0 : transition_malformed( ctx, stem );
554 0 : return;
555 0 : }
556 0 : if( FD_UNLIKELY( ctx->running_capitalization<0L ) ) {
557 0 : FD_LOG_WARNING(( "computed capitalization %ld is invalid", ctx->running_capitalization ));
558 0 : transition_malformed( ctx, stem );
559 0 : return;
560 0 : }
561 0 : ulong computed_capitalization = (ulong)ctx->running_capitalization;
562 0 : int capitalization_match = computed_capitalization==ctx->manifest_capitalization;
563 :
564 0 : int lthash_match = !memcmp( &ctx->hash_accum.expected_lthash, &ctx->hash_accum.calculated_lthash, sizeof(fd_lthash_value_t) );
565 :
566 0 : if( FD_UNLIKELY( !lthash_match ) ) {
567 : /* SnapshotError::MismatchedHash
568 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L479 */
569 0 : FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in %s snapshot manifest",
570 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
571 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ),
572 0 : ctx->full?"full":"incremental" ));
573 0 : transition_malformed( ctx, stem );
574 0 : return;
575 0 : } else {
576 0 : FD_LOG_INFO(( "calculated accounts lthash %s matches accounts lthash %s in %s snapshot manifest",
577 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
578 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ),
579 0 : ctx->full?"full":"incremental" ));
580 0 : }
581 :
582 0 : if( FD_UNLIKELY( !capitalization_match ) ) {
583 : /* SnapshotError::MismatchedCapitalization
584 : https://github.com/anza-xyz/agave/blob/v4.0.0-beta.2/runtime/src/snapshot_bank_utils.rs#L217 */
585 0 : FD_LOG_WARNING(( "%s snapshot manifest capitalization %lu does not match computed capitalization %lu",
586 0 : ctx->full?"full":"incremental", ctx->manifest_capitalization, computed_capitalization ));
587 0 : transition_malformed( ctx, stem );
588 0 : return;
589 0 : }
590 :
591 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->hash_accum.ack_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
592 0 : }
593 :
594 0 : if( FD_UNLIKELY( ctx->fail.wait && ctx->fail.ack_cnt==ctx->num_hash_tiles ) ) {
595 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->fail.exp_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
596 0 : ctx->fail.exp_sig = 0UL;
597 0 : ctx->fail.ack_cnt = 0UL;
598 0 : ctx->fail.wait = 0;
599 0 : return;
600 0 : }
601 0 : }
602 :
603 : static ulong
604 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
605 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
606 : ulong out_fds_cnt,
607 0 : int * out_fds ) {
608 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "unexpected out_fds_cnt %lu", out_fds_cnt ));
609 :
610 0 : ulong out_cnt = 0;
611 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
612 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
613 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
614 0 : }
615 0 : return out_cnt;
616 0 : }
617 :
618 : static ulong
619 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
620 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
621 : ulong out_cnt,
622 0 : struct sock_filter * out ) {
623 0 : populate_sock_filter_policy_fd_snaplv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
624 0 : return sock_filter_policy_fd_snaplv_tile_instr_cnt;
625 0 : }
626 :
627 : static void
628 : unprivileged_init( fd_topo_t * topo,
629 0 : fd_topo_tile_t * tile ) {
630 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
631 :
632 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
633 0 : fd_snaplv_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
634 :
635 0 : FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
636 :
637 0 : ulong expected_in_cnt = 1UL + fd_topo_tile_name_cnt( topo, "snaplh" );
638 0 : if( FD_UNLIKELY( tile->in_cnt!=expected_in_cnt ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, expected_in_cnt ));
639 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
640 :
641 0 : ulong adder_idx = 0UL;
642 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
643 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
644 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
645 :
646 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snapwm_lv" ) ) ) {
647 0 : ctx->in.wksp = in_wksp->wksp;
648 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
649 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
650 0 : ctx->in.mtu = in_link->mtu;
651 0 : ctx->in.pos = 0UL;
652 0 : ctx->in_kind[ i ] = IN_KIND_SNAPWM;
653 :
654 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplh_lv" ) ) ) {
655 0 : ctx->adder_in[ adder_idx ].wksp = in_wksp->wksp;
656 0 : ctx->adder_in[ adder_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->adder_in[ adder_idx ].wksp, in_link->dcache );
657 0 : ctx->adder_in[ adder_idx ].wmark = fd_dcache_compact_wmark ( ctx->adder_in[ adder_idx ].wksp, in_link->dcache, in_link->mtu );
658 0 : ctx->adder_in[ adder_idx ].mtu = in_link->mtu;
659 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLH;
660 0 : if( FD_LIKELY( adder_idx==0UL ) ) ctx->adder_in_offset = i;
661 0 : adder_idx++;
662 :
663 0 : } else {
664 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
665 0 : }
666 0 : }
667 :
668 0 : ctx->vinyl.bstream_seq_last = 0UL;
669 :
670 0 : for( uint i=0U; i<(tile->out_cnt); i++ ) {
671 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
672 :
673 0 : if( 0==strcmp( link->name, "snaplv_ct" ) ) {
674 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_CT ];
675 0 : o_link->idx = i;
676 0 : o_link->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
677 0 : o_link->chunk0 = 0UL;
678 0 : o_link->wmark = 0UL;
679 0 : o_link->chunk = 0UL;
680 :
681 0 : } else if( 0==strcmp( link->name, "snaplv_lh" ) ) {
682 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
683 0 : o_link->idx = i;
684 0 : o_link->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
685 0 : o_link->chunk0 = fd_dcache_compact_chunk0( o_link->mem, link->dcache );
686 0 : o_link->wmark = fd_dcache_compact_wmark( o_link->mem, link->dcache, link->mtu );
687 0 : o_link->chunk = o_link->chunk0;
688 :
689 0 : } else {
690 0 : FD_LOG_ERR(( "unexpected output link %s", link->name ));
691 0 : }
692 0 : }
693 :
694 0 : memset( ctx->vinyl.pending.active, 0, FD_SNAPLV_DUP_PENDING_CNT_MAX*sizeof(int) );
695 0 : ctx->vinyl.pending_cnt = 0;
696 :
697 0 : ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
698 0 : FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
699 0 : fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
700 0 : FD_TEST( vinyl_admin );
701 0 : ctx->vinyl.admin = vinyl_admin;
702 0 : for(;;) {
703 : /* This query can be done without the need of an rwlock. */
704 0 : ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
705 0 : if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
706 0 : vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
707 0 : fd_log_sleep( (long)1e6 /*1ms*/ );
708 0 : FD_SPIN_PAUSE();
709 0 : }
710 :
711 0 : ctx->metrics.full.duplicate_accounts_hashed = 0UL;
712 0 : ctx->metrics.incremental.duplicate_accounts_hashed = 0UL;
713 :
714 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
715 0 : ctx->full = 1;
716 :
717 0 : ctx->num_hash_tiles = fd_topo_tile_name_cnt( topo, "snaplh" );
718 0 : ctx->num_write_tiles = fd_topo_tile_name_cnt( topo, "snapwr" );
719 0 : FD_TEST( ctx->num_write_tiles<=FD_VINYL_ADMIN_WR_SEQ_CNT_MAX );
720 :
721 0 : ctx->hash_accum.received_lthashes = 0UL;
722 0 : ctx->hash_accum.awaiting_results = 0;
723 0 : ctx->hash_accum.hash_check_done = 0;
724 :
725 0 : fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
726 0 : fd_lthash_zero( &ctx->recovery.full_lthash );
727 :
728 0 : ctx->recovery.capitalization = 0L;
729 0 : ctx->running_capitalization = 0L;
730 0 : ctx->dup_capitalization = 0L;
731 0 : ctx->manifest_capitalization = 0UL;
732 :
733 0 : ctx->fail.exp_sig = 0UL;
734 0 : ctx->fail.ack_cnt = 0UL;
735 0 : ctx->fail.wait = 0;
736 0 : }
737 :
738 0 : #define STEM_BURST (FD_SNAPLV_STEM_BURST)
739 0 : #define STEM_LAZY 1000L
740 :
741 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaplv_t
742 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplv_t)
743 :
744 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
745 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
746 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
747 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
748 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
749 :
750 :
751 : #include "../../disco/stem/fd_stem.c"
752 :
753 : fd_topo_run_tile_t fd_tile_snaplv = {
754 : .name = NAME,
755 : .populate_allowed_fds = populate_allowed_fds,
756 : .populate_allowed_seccomp = populate_allowed_seccomp,
757 : .scratch_align = scratch_align,
758 : .scratch_footprint = scratch_footprint,
759 : .unprivileged_init = unprivileged_init,
760 : .run = stem_run,
761 : };
762 :
763 : #undef NAME
|