Line data Source code
1 : #include "fd_accdb_admin_v2_private.h"
2 : #include "../fd_flamenco_base.h"
3 : #include "../runtime/fd_runtime_const.h" /* FD_RUNTIME_ACC_SZ_MAX */
4 : #include "../../vinyl/data/fd_vinyl_data.h"
5 :
6 : /***********************************************************************
7 :
8 : fd_accdb_admin_v2_root.c contains the account rooting algorithm.
9 :
10 : This algorithm is designed to amortize vinyl I/O latency by
11 : processing accounts in batches.
12 :
13 : For each batch of accounts, it does the following logic:
14 :
15 : - ACQUIRE batch request for account updates
16 : - ERASE batch request for account deletions
17 : - Spin wait for ACQUIRE completion
18 : - Copy back modified accounts
19 : - RELEASE batch request for account updates
20 : - Spin wait for ACQUIRE, ERASE completions
21 : - Free records from funk
22 :
23 : ***********************************************************************/
24 :
25 : /* vinyl_spin_wait waits for completion of a vinyl request and asserts
26 : that all requests completed successfully. */
27 :
28 : static void
29 : vinyl_spin_wait( fd_vinyl_comp_t const * comp,
30 : fd_vinyl_key_t const * key0,
31 : schar const * err0,
32 : ulong cnt,
33 0 : char const * req_type_cstr ) {
34 :
35 : /* FIXME use a load-acquire here, such that later loads are ordered
36 : past this load */
37 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
38 0 : FD_COMPILER_MFENCE();
39 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
40 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
41 0 : FD_LOG_CRIT(( "vinyl tile rejected my %s request (%i-%s)",
42 0 : req_type_cstr, comp_err, fd_vinyl_strerror( comp_err ) ));
43 0 : }
44 :
45 0 : for( ulong i=0UL; i<cnt; i++ ) {
46 0 : int err = err0[ i ];
47 0 : if( FD_UNLIKELY( err!=FD_VINYL_SUCCESS && err!=FD_VINYL_ERR_KEY ) ) {
48 0 : FD_BASE58_ENCODE_32_BYTES( key0[i].uc, key_b58 );
49 0 : FD_LOG_CRIT(( "vinyl %s request failed for %s (%i-%s)",
50 0 : req_type_cstr, key_b58, err, fd_vinyl_strerror( err ) ));
51 0 : }
52 0 : }
53 0 : }
54 :
55 : /* funk_rec_write_lock spins until it gains a write lock for a record,
56 : increments the version number, and returns the updated ver_lock
57 : value. */
58 :
59 : static ulong
60 : fd_funk_rec_admin_lock( fd_funk_t const * funk,
61 0 : fd_funk_rec_t * rec ) {
62 0 : ulong rec_idx = (ulong)( rec - funk->rec_pool->ele );
63 0 : ulong volatile * vl = &funk->rec_lock[ rec_idx ];
64 0 : for(;;) {
65 0 : ulong const ver_lock = FD_VOLATILE_CONST( *vl );
66 0 : ulong const ver = fd_funk_rec_ver_bits ( ver_lock );
67 0 : ulong const lock = fd_funk_rec_lock_bits( ver_lock );
68 0 : if( FD_UNLIKELY( lock ) ) {
69 : /* Spin while there are active readers */
70 : /* FIXME kill client after spinning for 30 seconds to prevent silent deadlock */
71 0 : FD_SPIN_PAUSE();
72 0 : continue;
73 0 : }
74 0 : ulong const new_ver = fd_funk_rec_ver_inc( ver );
75 0 : ulong const new_vl = fd_funk_rec_ver_lock( new_ver, FD_FUNK_REC_LOCK_MASK );
76 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, ver_lock, new_vl )!=ver_lock ) ) {
77 0 : FD_SPIN_PAUSE();
78 0 : continue;
79 0 : }
80 0 : return new_vl;
81 0 : }
82 0 : }
83 :
84 : static void
85 : fd_funk_rec_admin_unlock( fd_funk_t const * funk,
86 : fd_funk_rec_t * rec,
87 0 : ulong ver_lock ) {
88 0 : ulong rec_idx = (ulong)( rec - funk->rec_pool->ele );
89 0 : ulong volatile * vl = &funk->rec_lock[ rec_idx ];
90 0 : FD_VOLATILE( *vl ) = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( ver_lock ), 0UL );
91 0 : }
92 :
93 : static void
94 : funk_free_rec( fd_funk_t * funk,
95 0 : fd_funk_rec_t * rec ) {
96 : /* Acquire admin lock (kick out readers)
97 :
98 : Note: At this point, well-behaving external readers will abandon a
99 : read-lock attempt if they observe this active write lock. (An
100 : admin lock always implies the record is about to die) */
101 :
102 0 : FD_COMPILER_MFENCE();
103 0 : ulong ver_lock = fd_funk_rec_admin_lock( funk, rec );
104 :
105 : /* Free record */
106 :
107 0 : memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
108 0 : FD_COMPILER_MFENCE();
109 0 : rec->map_next = FD_FUNK_REC_IDX_NULL;
110 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
111 0 : fd_funk_rec_admin_unlock( funk, rec, ver_lock );
112 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
113 0 : }
114 :
115 : /* funk_gc_chain optimistically deletes all but the newest rooted
116 : revisions of rec. This possibly deletes 'rec'. Returns rec if rec
117 : is the only known rooted revision, otherwise returns NULL (if rec was
118 : deleted). Note that due to edge cases, revisions that are not in the
119 : oldest tracked slot, may not reliably get cleaned up. (The oldest
120 : tracked slot always gets cleaned up, though.) */
121 :
122 : static fd_funk_rec_t *
123 : funk_gc_chain( fd_accdb_admin_v2_t * const admin,
124 0 : fd_funk_rec_t * const rec ) {
125 :
126 0 : fd_accdb_lineage_t * lineage = admin->root_lineage;
127 0 : fd_funk_t * funk = admin->v1->funk;
128 0 : fd_funk_rec_t * rec_pool = funk->rec_pool->ele;
129 0 : ulong rec_max = funk->rec_pool->ele_max;
130 0 : ulong seed = funk->rec_map->map->seed;
131 0 : ulong chain_cnt = funk->rec_map->map->chain_cnt;
132 0 : ulong root_slot = lineage->fork[0].ul[0];
133 :
134 0 : ulong hash = fd_funk_rec_map_key_hash( &rec->pair, seed );
135 0 : ulong chain_idx = (hash & (chain_cnt-1UL) );
136 :
137 : /* Lock rec_map chain */
138 :
139 0 : int lock_err = fd_funk_rec_map_iter_lock( funk->rec_map, &chain_idx, 1UL, FD_MAP_FLAG_BLOCKING );
140 0 : if( FD_UNLIKELY( lock_err!=FD_MAP_SUCCESS ) ) {
141 0 : FD_LOG_CRIT(( "fd_funk_rec_map_iter_lock failed (%i-%s)", lock_err, fd_map_strerror( lock_err ) ));
142 0 : }
143 :
144 0 : fd_funk_rec_map_shmem_private_chain_t * chain =
145 0 : fd_funk_rec_map_shmem_private_chain( funk->rec_map->map, 0UL ) + chain_idx;
146 0 : ulong ver =
147 0 : fd_funk_rec_map_private_vcnt_ver( FD_VOLATILE_CONST( chain->ver_cnt ) );
148 0 : FD_CRIT( ver&1UL, "chain is not locked" );
149 :
150 : /* Walk map chain */
151 :
152 0 : fd_funk_rec_t * found_rec = NULL;
153 0 : uint * pnext = &chain->head_cidx;
154 0 : uint cur = *pnext;
155 0 : ulong chain_len = 0UL;
156 0 : ulong iter = 0UL;
157 0 : while( cur!=FD_FUNK_REC_IDX_NULL ) {
158 0 : if( FD_UNLIKELY( iter++ > rec_max ) ) FD_LOG_CRIT(( "cycle detected in rec_map chain %lu", chain_idx ));
159 :
160 : /* Is this node garbage? */
161 :
162 0 : fd_funk_rec_t * node = &funk->rec_pool->ele[ cur ];
163 0 : if( FD_UNLIKELY( cur==node->map_next ) ) FD_LOG_CRIT(( "accdb corruption detected: cycle in rec_map chain %lu", chain_idx ));
164 0 : cur = node->map_next;
165 0 : if( !fd_funk_rec_key_eq( rec->pair.key, node->pair.key ) ) goto retain;
166 0 : if( node->pair.xid->ul[0]>root_slot ) goto retain;
167 0 : if( !found_rec ) {
168 0 : found_rec = node;
169 0 : goto retain;
170 0 : }
171 :
172 : /* No longer need this node */
173 :
174 0 : if( node->pair.xid->ul[0] > rec->pair.xid->ul[0] ) {
175 : /* If this node is newer than the to-be-deleted slot, need to
176 : remove it from the transaction's record list. */
177 0 : uint neigh_prev = node->prev_idx;
178 0 : uint neigh_next = node->next_idx;
179 0 : if( neigh_prev==FD_FUNK_REC_IDX_NULL ||
180 0 : neigh_next==FD_FUNK_REC_IDX_NULL ) {
181 : /* Node is first or last of transaction -- too bothersome to
182 : remove it from the transaction's record list */
183 0 : goto retain;
184 0 : }
185 0 : rec_pool[ neigh_next ].prev_idx = neigh_prev;
186 0 : rec_pool[ neigh_prev ].next_idx = neigh_next;
187 0 : }
188 :
189 : /* Destroy this node */
190 :
191 0 : funk_free_rec( funk, node );
192 0 : *pnext = cur;
193 0 : continue;
194 :
195 0 : retain:
196 0 : pnext = &node->map_next;
197 0 : chain_len++;
198 0 : }
199 :
200 : /* Unlock rec_map chain */
201 :
202 0 : FD_COMPILER_MFENCE();
203 0 : FD_VOLATILE( chain->ver_cnt ) =
204 0 : fd_funk_rec_map_private_vcnt( ver+1UL, chain_len );
205 0 : FD_COMPILER_MFENCE();
206 0 : return found_rec==rec ? found_rec : NULL;
207 0 : }
208 :
209 : /* Main algorithm */
210 :
211 : fd_funk_rec_t *
212 : fd_accdb_v2_root_batch( fd_accdb_admin_v2_t * admin,
213 0 : fd_funk_rec_t * rec0 ) {
214 0 : long t_start = fd_tickcount();
215 :
216 0 : fd_funk_t * funk = admin->v1->funk; /* unrooted DB */
217 0 : fd_wksp_t * funk_wksp = funk->wksp; /* shm workspace containing unrooted accounts */
218 0 : fd_funk_rec_t * rec_pool = funk->rec_pool->ele; /* funk rec arena */
219 0 : fd_vinyl_rq_t * rq = admin->vinyl_rq; /* "request queue "*/
220 0 : fd_vinyl_req_pool_t * req_pool = admin->vinyl_req_pool; /* "request pool" */
221 0 : fd_wksp_t * req_wksp = admin->vinyl_req_wksp; /* shm workspace containing request buffer */
222 0 : fd_wksp_t * data_wksp = admin->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
223 0 : ulong link_id = admin->vinyl_link_id; /* vinyl client ID */
224 :
225 : /* Collect funk request batch */
226 :
227 0 : fd_funk_rec_t * recs[ FD_ACCDB_ROOT_BATCH_MAX ];
228 0 : ulong rec_cnt;
229 :
230 0 : fd_funk_rec_t * next = rec0;
231 0 : for( rec_cnt=0UL; next && rec_cnt<FD_ACCDB_ROOT_BATCH_MAX; ) {
232 0 : fd_funk_rec_t * cur = next;
233 0 : if( fd_funk_rec_idx_is_null( cur->next_idx ) ) {
234 0 : next = NULL;
235 0 : } else {
236 0 : next = &rec_pool[ cur->next_idx ];
237 0 : }
238 0 : cur->prev_idx = FD_FUNK_REC_IDX_NULL;
239 0 : cur->next_idx = FD_FUNK_REC_IDX_NULL;
240 :
241 0 : if( funk_gc_chain( admin, cur ) ) {
242 0 : recs[ rec_cnt++ ] = cur;
243 0 : }
244 0 : }
245 :
246 : /* Partition batch into ACQUIRE (updates) and ERASE (deletions) */
247 :
248 0 : ulong acq_cnt = 0UL;
249 0 : ulong del_cnt;
250 0 : for( ulong i=0UL; i<rec_cnt; i++ ) {
251 0 : fd_account_meta_t const * meta = fd_funk_val( recs[ i ], funk_wksp );
252 0 : FD_CRIT( meta && recs[ i ]->val_sz>=sizeof(fd_account_meta_t), "corrupt funk_rec" );
253 0 : if( meta->lamports ) {
254 0 : fd_funk_rec_t * tmp = recs[ i ];
255 0 : recs[ i ] = recs[ acq_cnt ];
256 0 : recs[ acq_cnt ] = tmp;
257 0 : acq_cnt++;
258 0 : }
259 0 : }
260 0 : del_cnt = rec_cnt - acq_cnt;
261 :
262 : /* Create ACQUIRE and ERASE batch requests */
263 :
264 0 : ulong del_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ERASE */
265 0 : ulong acq_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ACQUIRE */
266 0 : fd_vinyl_key_t * acq_key0 = fd_vinyl_req_batch_key( req_pool, acq_batch );
267 0 : fd_vinyl_key_t * del_key0 = fd_vinyl_req_batch_key( req_pool, del_batch );
268 :
269 0 : for( ulong i=0UL; i<acq_cnt; i++ ) {
270 0 : fd_vinyl_key_init( &acq_key0[ i ], recs[ i ]->pair.key, 32UL );
271 0 : }
272 0 : for( ulong i=0UL; i<del_cnt; i++ ) {
273 0 : fd_vinyl_key_init( &del_key0[ i ], recs[ acq_cnt+i ]->pair.key, 32UL );
274 0 : }
275 :
276 : /* Send off ACQUIRE and ERASE requests */
277 :
278 0 : fd_vinyl_comp_t * acq_comp = fd_vinyl_req_batch_comp ( req_pool, acq_batch );
279 0 : fd_vinyl_comp_t * del_comp = fd_vinyl_req_batch_comp ( req_pool, del_batch );
280 0 : schar * acq_err0 = fd_vinyl_req_batch_err ( req_pool, acq_batch );
281 0 : schar * del_err0 = fd_vinyl_req_batch_err ( req_pool, del_batch );
282 0 : ulong * acq_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, acq_batch );
283 :
284 0 : memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
285 0 : memset( del_comp, 0, sizeof(fd_vinyl_comp_t) );
286 0 : for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
287 0 : for( ulong i=0UL; i<del_cnt; i++ ) del_err0[ i ] = 0;
288 0 : for( ulong i=0UL; i<acq_cnt; i++ ) {
289 0 : fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
290 :
291 0 : ulong data_sz = src_meta->dlen;
292 0 : FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
293 :
294 0 : ulong val_sz = sizeof(fd_account_meta_t) + data_sz;
295 0 : acq_val_gaddr0[ i ] = val_sz;
296 0 : admin->base.root_tot_sz += val_sz;
297 0 : }
298 :
299 0 : fd_vinyl_req_send_batch(
300 0 : rq, req_pool, req_wksp,
301 0 : admin->vinyl_req_id++, link_id,
302 0 : FD_VINYL_REQ_TYPE_ACQUIRE,
303 0 : FD_VINYL_REQ_FLAG_MODIFY |
304 0 : FD_VINYL_REQ_FLAG_IGNORE |
305 0 : FD_VINYL_REQ_FLAG_CREATE,
306 0 : acq_batch, acq_cnt
307 0 : );
308 0 : fd_vinyl_req_send_batch(
309 0 : rq, req_pool, req_wksp,
310 0 : admin->vinyl_req_id++, link_id,
311 0 : FD_VINYL_REQ_TYPE_ERASE,
312 0 : 0UL,
313 0 : del_batch, del_cnt
314 0 : );
315 :
316 : /* Spin for ACQUIRE completion */
317 :
318 0 : vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "ACQUIRE" );
319 0 : long t_acquire = fd_tickcount();
320 :
321 : /* Copy back modified accounts */
322 :
323 0 : for( ulong i=0UL; i<acq_cnt; i++ ) {
324 0 : fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
325 :
326 0 : ulong data_sz = src_meta->dlen;
327 0 : ulong val_sz = sizeof(fd_account_meta_t) + data_sz;
328 0 : FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
329 :
330 0 : fd_account_meta_t * dst_meta = fd_wksp_laddr_fast( data_wksp, acq_val_gaddr0[ i ] );
331 0 : fd_vinyl_info_t * val_info = fd_vinyl_data_info( dst_meta );
332 :
333 0 : fd_memcpy( dst_meta, src_meta, val_sz );
334 0 : val_info->val_sz = (uint)val_sz;
335 0 : }
336 :
337 : /* Send off RELEASE batch request (reuse acq_batch) */
338 :
339 0 : memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
340 0 : for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
341 0 : fd_vinyl_req_send_batch(
342 0 : rq, req_pool, req_wksp,
343 0 : admin->vinyl_req_id++, link_id,
344 0 : FD_VINYL_REQ_TYPE_RELEASE,
345 0 : FD_VINYL_REQ_FLAG_MODIFY,
346 0 : acq_batch, acq_cnt
347 0 : );
348 0 : long t_copy = fd_tickcount();
349 :
350 : /* Spin for ERASE, RELEASE completions */
351 :
352 0 : vinyl_spin_wait( del_comp, del_key0, del_err0, del_cnt, "ERASE" );
353 0 : fd_vinyl_req_pool_release( req_pool, del_batch );
354 :
355 0 : vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "RELEASE" );
356 0 : fd_vinyl_req_pool_release( req_pool, acq_batch );
357 0 : long t_release = fd_tickcount();
358 :
359 : /* Remove funk records */
360 :
361 0 : for( ulong i=0UL; i<rec_cnt; i++ ) {
362 0 : fd_funk_xid_key_pair_t pair = recs[ i ]->pair;
363 0 : fd_funk_rec_query_t query[1];
364 0 : int rm_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
365 0 : if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
366 0 : funk_free_rec( funk, recs[ i ] );
367 0 : }
368 0 : long t_gc = fd_tickcount();
369 :
370 : /* Update metrics */
371 :
372 0 : admin->base.root_cnt += (uint)acq_cnt;
373 0 : admin->base.reclaim_cnt += (uint)del_cnt;
374 0 : admin->base.dt_vinyl += ( t_acquire - t_start ) + ( t_release - t_copy );
375 0 : admin->base.dt_copy += ( t_copy - t_acquire );
376 0 : admin->base.dt_gc += ( t_gc - t_release );
377 :
378 0 : return next;
379 0 : }
|