Line data Source code
1 : /* fd_snapin_tile_funk.c contains APIs to load accounts into funk. */
2 :
3 : #include "fd_snapin_tile_private.h"
4 : #include "../../flamenco/accdb/fd_accdb_sync.h"
5 : #include "../../funk/fd_funk.h"
6 :
7 : int
8 : fd_snapin_process_account_header_funk( fd_snapin_tile_t * ctx,
9 0 : fd_ssparse_advance_result_t * result ) {
10 0 : fd_funk_t * funk = ctx->funk;
11 :
12 0 : fd_funk_rec_key_t id = FD_LOAD( fd_funk_rec_key_t, result->account_header.pubkey );
13 0 : fd_funk_rec_query_t query[1];
14 0 : fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
15 0 : fd_funk_rec_t const * existing_rec = rec;
16 :
17 0 : ctx->metrics.accounts_loaded++;
18 :
19 0 : int early_exit = 0;
20 0 : if( !ctx->full && !existing_rec ) {
21 0 : existing_rec = fd_funk_rec_query_try( funk, fd_funk_root( funk ), &id, query );
22 0 : }
23 0 : if( FD_UNLIKELY( existing_rec ) ) {
24 0 : fd_account_meta_t * meta = fd_funk_val( existing_rec, funk->wksp );
25 0 : if( FD_UNLIKELY( meta ) ) {
26 0 : if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
27 0 : ctx->acc_data = NULL;
28 0 : ctx->metrics.accounts_ignored++;
29 0 : fd_snapin_send_duplicate_account( ctx, result->account_header.lamports, NULL, result->account_header.data_len, (uchar)result->account_header.executable, result->account_header.owner, result->account_header.pubkey, 0, &early_exit );
30 0 : return early_exit;
31 0 : }
32 0 : ctx->metrics.accounts_replaced++;
33 0 : ctx->dup_capitalization = fd_ulong_sat_add( ctx->dup_capitalization, meta->lamports );
34 0 : fd_snapin_send_duplicate_account( ctx, meta->lamports, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen, meta->executable, meta->owner, result->account_header.pubkey, 1, &early_exit);
35 0 : }
36 0 : }
37 :
38 0 : int should_publish = 0;
39 0 : fd_funk_rec_prepare_t prepare[1];
40 0 : if( FD_LIKELY( !rec ) ) {
41 0 : should_publish = 1;
42 0 : rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
43 0 : FD_TEST( rec );
44 0 : }
45 :
46 0 : fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
47 : /* Allocate data space from heap, free old value (if any) */
48 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
49 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
50 0 : ulong alloc_max;
51 0 : meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
52 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of memory while loading snapshot (increase [accounts.file_size_gib])" ));
53 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
54 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
55 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
56 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
57 :
58 0 : meta->dlen = (uint)result->account_header.data_len;
59 0 : meta->slot = result->account_header.slot;
60 0 : memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
61 0 : meta->lamports = result->account_header.lamports;
62 0 : meta->executable = (uchar)result->account_header.executable;
63 :
64 0 : ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
65 :
66 0 : ctx->capitalization = fd_ulong_sat_add( ctx->capitalization, result->account_header.lamports );
67 :
68 0 : if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
69 0 : return early_exit;
70 0 : }
71 :
72 : int
73 : fd_snapin_process_account_data_funk( fd_snapin_tile_t * ctx,
74 0 : fd_ssparse_advance_result_t * result ) {
75 0 : int early_exit = 0;
76 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) {
77 0 : fd_snapin_send_duplicate_account_data( ctx, result->account_data.data, result->account_data.data_sz, &early_exit );
78 0 : return early_exit;
79 0 : }
80 :
81 0 : fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
82 0 : ctx->acc_data += result->account_data.data_sz;
83 0 : return 0;
84 0 : }
85 :
86 : /* streamlined_insert inserts an unfragmented account.
87 : Only used while loading a full snapshot, not an incremental. */
88 :
89 : static void
90 : streamlined_insert( fd_snapin_tile_t * ctx,
91 : fd_funk_rec_t * rec,
92 : uchar const * frame,
93 0 : ulong slot ) {
94 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
95 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
96 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
97 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
98 0 : _Bool executable = !!frame[ 0x60UL ];
99 :
100 0 : fd_funk_t * funk = ctx->funk;
101 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
102 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
103 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
104 0 : ulong alloc_max;
105 0 : fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
106 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of memory while loading snapshot (increase [accounts.file_size_gib])" ));
107 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
108 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
109 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
110 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
111 :
112 : /* Write metadata */
113 0 : meta->dlen = (uint)data_len;
114 0 : meta->slot = slot;
115 0 : memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
116 0 : meta->lamports = lamports;
117 0 : meta->executable = (uchar)executable;
118 :
119 : /* Write data */
120 0 : uchar * acc_data = (uchar *)( meta+1 );
121 0 : fd_memcpy( acc_data, frame+0x88UL, data_len );
122 :
123 : /* update capitalization */
124 0 : ctx->capitalization = fd_ulong_sat_add( ctx->capitalization, lamports );
125 0 : }
126 :
127 : /* process_account_batch is a happy path performance optimization
128 : handling insertion of lots of small accounts.
129 :
130 : The main optimization implemented for funk is doing hash map memory
131 : accesses in parallel to amortize DRAM latency. */
132 :
133 : int
134 : fd_snapin_process_account_batch_funk( fd_snapin_tile_t * ctx,
135 : fd_ssparse_advance_result_t * result,
136 0 : buffered_account_batch_t * buffered_batch ) {
137 0 : int early_exit = 0;
138 0 : ulong start_idx = result ? 0 : buffered_batch->remaining_idx;
139 0 : fd_funk_t * funk = ctx->funk;
140 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
141 0 : fd_funk_rec_t * rec_tbl = funk->rec_pool->ele;
142 0 : fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
143 :
144 : /* Derive map chains */
145 0 : uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
146 0 : ulong chain_mask = rec_map->map->chain_cnt-1UL;
147 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
148 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
149 0 : uchar const * pubkey = frame+0x10UL;
150 0 : ulong memo = fd_funk_rec_key_hash1( pubkey, rec_map->map->seed );
151 0 : chain_idx[ i ] = (uint)( memo&chain_mask );
152 0 : }
153 :
154 : /* Parallel load hash chain heads */
155 0 : uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
156 0 : uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
157 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
158 0 : map_node [ i ] = chain_tbl[ chain_idx[ i ] ].head_cidx;
159 0 : chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
160 0 : }
161 0 : uint chain_max = 0U;
162 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
163 0 : chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
164 0 : }
165 :
166 : /* Parallel walk hash chains */
167 0 : static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
168 0 : fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
169 0 : for( ulong j=0UL; j<chain_max; j++ ) {
170 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
171 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
172 0 : uchar const * pubkey = frame+0x10UL;
173 0 : int const has_node = j<chain_cnt[ i ];
174 0 : fd_funk_rec_t * node = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
175 0 : int const key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
176 0 : if( has_node && key_match ) rec[ i ] = node;
177 0 : map_node[ i ] = node->map_next;
178 0 : }
179 0 : }
180 :
181 : /* Create map entries */
182 0 : ulong insert_limit = FD_SSPARSE_ACC_BATCH_MAX;
183 0 : for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
184 0 : ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
185 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
186 0 : uchar const * pubkey = frame+0x10UL;
187 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
188 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
189 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
190 0 : _Bool executable = !!frame[ 0x60UL ];
191 0 : uchar const * data = frame+0x88UL;
192 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
193 0 : fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
194 :
195 0 : ctx->metrics.accounts_loaded++;
196 0 : fd_funk_rec_t * r = rec[ i ];
197 0 : if( FD_LIKELY( !r ) ) { /* optimize for new account */
198 0 : r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
199 0 : FD_TEST( r );
200 0 : ulong rec_idx = (ulong)( r - rec_tbl );
201 :
202 0 : fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
203 0 : fd_funk_rec_key_copy( r->pair.key, &key );
204 0 : r->map_next = 0U;
205 0 : r->next_idx = UINT_MAX;
206 0 : r->prev_idx = UINT_MAX;
207 0 : r->val_sz = 0;
208 0 : r->val_max = 0;
209 0 : r->tag = 0;
210 0 : r->val_gaddr = 0UL;
211 :
212 0 : funk->rec_lock[ rec_idx ] = fd_funk_rec_ver_lock( 1UL, 0UL );
213 :
214 : /* Insert to hash map. In theory, a key could appear twice in the
215 : same batch. All accounts in a batch are guaranteed to be from
216 : the same slot though, so this is fine, assuming that accdb code
217 : gracefully handles duplicate hash map entries. */
218 0 : fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
219 0 : ulong ver_cnt = chain->ver_cnt;
220 0 : uint head_cidx = chain->head_cidx;
221 0 : chain->ver_cnt = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
222 0 : chain->head_cidx = (uint)( r-rec_tbl );
223 0 : r->map_next = head_cidx;
224 0 : rec[ i ] = r;
225 0 : } else { /* existing record for key found */
226 0 : fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
227 0 : if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
228 0 : FD_TEST( existing );
229 0 : if( existing->slot > slot ) {
230 0 : rec[ i ] = NULL; /* skip record if existing value is newer */
231 : /* send the skipped account to the subtracting hash tile */
232 0 : ctx->metrics.accounts_ignored++;
233 0 : fd_snapin_send_duplicate_account( ctx, lamports, data, data_len, executable, owner, pubkey, 1, &early_exit );
234 0 : } else if( slot > existing->slot) {
235 : /* send the to-be-replaced account to the subtracting hash tile */
236 0 : ctx->metrics.accounts_replaced++;
237 0 : ctx->dup_capitalization = fd_ulong_sat_add( ctx->dup_capitalization, existing->lamports );
238 0 : fd_snapin_send_duplicate_account( ctx, existing->lamports, (uchar const *)existing + sizeof(fd_account_meta_t), existing->dlen, existing->executable, existing->owner, pubkey, 1, &early_exit );
239 0 : } else { /* slot==existing->slot */
240 0 : FD_TEST( 0 );
241 0 : }
242 :
243 0 : if( FD_LIKELY( early_exit ) ) {
244 : /* buffer account batch if not already buffered */
245 0 : if( FD_LIKELY( result && i<FD_SSPARSE_ACC_BATCH_MAX-1UL ) ) {
246 0 : FD_TEST( ctx->buffered_batch.batch_cnt==0UL );
247 0 : fd_memcpy( ctx->buffered_batch.batch, result->account_batch.batch, sizeof(uchar const*)*FD_SSPARSE_ACC_BATCH_MAX );
248 0 : ctx->buffered_batch.slot = result->account_batch.slot;
249 0 : ctx->buffered_batch.batch_cnt = result->account_batch.batch_cnt;
250 0 : ctx->buffered_batch.remaining_idx = i + 1UL;
251 0 : }
252 :
253 0 : insert_limit = i+1UL;
254 0 : break;
255 0 : }
256 0 : }
257 0 : }
258 :
259 : /* Actually insert accounts */
260 0 : for( ulong i=start_idx; i<insert_limit; i++ ) {
261 0 : uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
262 0 : ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
263 0 : if( rec[ i ] ) {
264 0 : streamlined_insert( ctx, rec[ i ], frame, slot );
265 0 : }
266 0 : }
267 :
268 0 : if( FD_LIKELY( buffered_batch ) ) {
269 0 : if( FD_LIKELY( insert_limit==FD_SSPARSE_ACC_BATCH_MAX ) ) {
270 0 : buffered_batch->batch_cnt = 0UL;
271 0 : buffered_batch->remaining_idx = 0UL;
272 0 : } else {
273 0 : buffered_batch->remaining_idx = insert_limit;
274 0 : }
275 0 : }
276 :
277 0 : return early_exit;
278 0 : }
279 :
280 : void
281 : fd_snapin_read_account_funk( fd_snapin_tile_t * ctx,
282 : void const * acct_addr,
283 : fd_account_meta_t * meta,
284 : uchar * data,
285 0 : ulong data_max ) {
286 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
287 :
288 : /* Start a speculative database query.
289 : It is assumed that no conflicting database accesses take place
290 : while the account is being read from funk. */
291 :
292 0 : fd_accdb_ro_t ro[1];
293 0 : if( FD_UNLIKELY( !fd_accdb_open_ro( ctx->accdb, ro, ctx->xid, acct_addr ) ) ) {
294 0 : return;
295 0 : }
296 :
297 0 : ulong data_sz = fd_accdb_ref_data_sz( ro );
298 0 : if( FD_UNLIKELY( data_sz>data_max ) ) {
299 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
300 0 : FD_LOG_CRIT(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
301 0 : acct_addr_b58, data_sz, data_max ));
302 0 : }
303 :
304 0 : memcpy( meta->owner, fd_accdb_ref_owner( ro ), sizeof(fd_pubkey_t) );
305 0 : meta->lamports = fd_accdb_ref_lamports( ro );
306 0 : meta->slot = fd_accdb_ref_slot( ro );
307 0 : meta->dlen = (uint)data_sz;
308 0 : meta->executable = !!fd_accdb_ref_exec_bit( ro );
309 0 : fd_memcpy( data, fd_accdb_ref_data_const( ro ), data_sz );
310 :
311 0 : fd_accdb_close_ro( ctx->accdb, ro );
312 0 : }
|