Line data Source code
1 : #include "fd_accdb_admin_v2_private.h"
2 :
3 : FD_STATIC_ASSERT( alignof(fd_accdb_admin_v2_t)<=alignof(fd_accdb_admin_t), layout );
4 : FD_STATIC_ASSERT( sizeof (fd_accdb_admin_v2_t)<=sizeof(fd_accdb_admin_t), layout );
5 :
6 : fd_accdb_admin_t *
7 : fd_accdb_admin_v2_init( fd_accdb_admin_t * accdb_,
8 : void * shfunk,
9 : void * shlocks,
10 : void * vinyl_rq,
11 : void * vinyl_data,
12 : void * vinyl_req_pool,
13 : ulong vinyl_link_id,
14 0 : ulong max_depth ) {
15 : /* Call superclass constructor */
16 0 : if( FD_UNLIKELY( !fd_accdb_admin_v1_init( accdb_, shfunk, shlocks ) ) ) {
17 0 : return NULL;
18 0 : }
19 0 : if( FD_UNLIKELY( !vinyl_data ) ) {
20 0 : FD_LOG_WARNING(( "NULL vinyl_data" ));
21 0 : return NULL;
22 0 : }
23 :
24 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( vinyl_rq );
25 0 : fd_vinyl_req_pool_t * req_pool = fd_vinyl_req_pool_join( vinyl_req_pool );
26 0 : if( FD_UNLIKELY( !rq || !req_pool ) ) {
27 : /* component joins log warning if this is reached */
28 0 : FD_LOG_WARNING(( "Failed to initialize database client" ));
29 0 : return NULL;
30 0 : }
31 :
32 0 : fd_accdb_admin_v2_t * accdb = fd_type_pun( accdb_ );
33 0 : accdb->root_lineage->max_depth = max_depth;
34 0 : accdb->vinyl_req_id = 0UL;
35 0 : accdb->vinyl_rq = rq;
36 0 : accdb->vinyl_link_id = vinyl_link_id;
37 0 : accdb->vinyl_data_wksp = vinyl_data;
38 0 : accdb->vinyl_req_wksp = fd_wksp_containing( req_pool );
39 0 : accdb->vinyl_req_pool = req_pool;
40 0 : accdb->base.accdb_type = FD_ACCDB_TYPE_V2;
41 0 : accdb->base.vt = &fd_accdb_admin_v2_vt;
42 0 : return accdb_;
43 0 : }
44 :
45 : static fd_accdb_admin_v2_t *
46 0 : downcast( fd_accdb_admin_t * admin ) {
47 0 : if( FD_UNLIKELY( !admin ) ) {
48 0 : FD_LOG_CRIT(( "NULL admin" ));
49 0 : }
50 0 : if( FD_UNLIKELY( admin->base.accdb_type!=FD_ACCDB_TYPE_V2 ) ) {
51 0 : FD_LOG_CRIT(( "corrupt accdb_admin handle" ));
52 0 : }
53 0 : return (fd_accdb_admin_v2_t *)admin;
54 0 : }
55 :
56 : void
57 0 : fd_accdb_admin_v2_fini( fd_accdb_admin_t * admin_ ) {
58 0 : fd_accdb_admin_v2_t * admin = downcast( admin_ );
59 :
60 0 : fd_vinyl_rq_leave( admin->vinyl_rq );
61 :
62 : /* superclass destructor */
63 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
64 0 : fd_accdb_admin_v1_fini( admin_ );
65 0 : }
66 :
67 : fd_funk_txn_xid_t
68 0 : fd_accdb_v2_root_get( fd_accdb_admin_t const * admin ) {
69 0 : return fd_accdb_v1_root_get( admin );
70 0 : }
71 :
72 : void
73 : fd_accdb_v2_attach_child( fd_accdb_admin_t * admin_,
74 : fd_funk_txn_xid_t const * xid_parent,
75 0 : fd_funk_txn_xid_t const * xid_new ) {
76 0 : fd_accdb_admin_v1_t * db = downcast( admin_ )->v1;
77 0 : FD_LOG_INFO(( "accdb txn xid %lu:%lu: created with parent %lu:%lu",
78 0 : xid_new ->ul[0], xid_new ->ul[1],
79 0 : xid_parent->ul[0], xid_parent->ul[1] ));
80 0 : fd_funk_txn_prepare( db->funk, xid_parent, xid_new );
81 0 : }
82 :
83 : void
84 : fd_accdb_v2_cancel( fd_accdb_admin_t * admin_,
85 0 : fd_funk_txn_xid_t const * xid ) {
86 0 : fd_accdb_admin_v2_t * admin = downcast( admin_ );
87 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
88 0 : fd_accdb_v1_cancel( admin_, xid );
89 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V2;
90 0 : }
91 :
92 : static void
93 : publish_recs( fd_accdb_admin_v2_t * admin,
94 0 : fd_funk_txn_t * txn ) {
95 0 : fd_funk_rec_t * rec_pool = admin->v1->funk->rec_pool->ele;
96 0 : fd_funk_rec_t * head = !fd_funk_rec_idx_is_null( txn->rec_head_idx ) ?
97 0 : &rec_pool[ txn->rec_head_idx ] : NULL;
98 0 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
99 0 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
100 0 : while( head ) {
101 0 : head = fd_accdb_v2_root_batch( admin, head );
102 0 : }
103 0 : }
104 :
105 : static void
106 : txn_unregister( fd_funk_t * funk,
107 0 : fd_funk_txn_t * txn ) {
108 0 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
109 0 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
110 0 : funk->txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
111 0 : child_idx = fd_funk_txn_idx( funk->txn_pool->ele[ child_idx ].sibling_next_cidx );
112 0 : }
113 :
114 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
115 0 : fd_funk_txn_map_query_t query[1];
116 0 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, xid, NULL, query, 0 );
117 0 : if( FD_UNLIKELY( remove_err!=FD_MAP_SUCCESS ) ) {
118 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: fd_funk_txn_map_remove failed: %i-%s", remove_err, fd_map_strerror( remove_err ) ));
119 0 : }
120 0 : }
121 :
122 : static void
123 : txn_free( fd_funk_t * funk,
124 0 : fd_funk_txn_t * txn ) {
125 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_FREE;
126 0 : txn->parent_cidx = UINT_MAX;
127 0 : txn->sibling_prev_cidx = UINT_MAX;
128 0 : txn->sibling_next_cidx = UINT_MAX;
129 0 : txn->child_head_cidx = UINT_MAX;
130 0 : txn->child_tail_cidx = UINT_MAX;
131 0 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
132 0 : }
133 :
134 : static void
135 : fd_accdb_txn_publish_one( fd_accdb_admin_v2_t * accdb,
136 0 : fd_funk_txn_t * txn ) {
137 0 : fd_funk_t * funk = accdb->v1->funk;
138 :
139 : /* Children of transaction are now children of root */
140 0 : funk->shmem->child_head_cidx = txn->child_head_cidx;
141 0 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
142 :
143 : /* Phase 1: Mark transaction as "last published" */
144 :
145 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
146 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
147 0 : FD_LOG_CRIT(( "fd_accdb_txn_advance_root: parent of txn %lu:%lu is not root", xid->ul[0], xid->ul[1] ));
148 0 : }
149 0 : fd_funk_txn_xid_st_atomic( funk->shmem->last_publish, xid );
150 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: publish", (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
151 :
152 : /* Phase 2: Drain users from transaction */
153 :
154 0 : ulong txn_idx = (ulong)( txn - funk->txn_pool->ele );
155 0 : fd_rwlock_write( &funk->txn_lock[ txn_idx ] );
156 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_PUBLISH;
157 :
158 : /* Phase 3: Move records from funk to vinyl */
159 :
160 0 : publish_recs( accdb, txn );
161 :
162 : /* Phase 4: Unregister transaction */
163 :
164 0 : txn_unregister( funk, txn );
165 :
166 : /* Phase 5: Free transaction object */
167 :
168 0 : fd_rwlock_unwrite( &funk->txn_lock[ txn_idx ] );
169 0 : txn_free( funk, txn );
170 0 : }
171 :
172 : void
173 : fd_accdb_v2_advance_root( fd_accdb_admin_t * accdb_,
174 0 : fd_funk_txn_xid_t const * xid ) {
175 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
176 0 : fd_funk_t * funk = accdb->v1->funk;
177 :
178 0 : fd_accdb_lineage_set_fork( accdb->root_lineage, funk, xid );
179 :
180 : /* Assume no concurrent access to txn_map */
181 :
182 0 : fd_funk_txn_map_query_t query[1];
183 0 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
184 0 : if( FD_UNLIKELY( query_err ) ) {
185 0 : FD_LOG_CRIT(( "fd_accdb_advance_root failed: fd_funk_txn_map_query_try(xid=%lu:%lu) returned (%i-%s)",
186 0 : xid->ul[0], xid->ul[1], query_err, fd_map_strerror( query_err ) ));
187 0 : }
188 0 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
189 :
190 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: advancing root",
191 0 : (void *)txn,
192 0 : xid->ul[0], xid->ul[1] ));
193 :
194 0 : fd_accdb_txn_cancel_siblings( accdb->v1, txn );
195 :
196 0 : fd_accdb_lineage_t * lineage = accdb->root_lineage;
197 0 : fd_funk_txn_xid_t oldest_xid = lineage->fork[ lineage->fork_depth-1UL ];
198 0 : if( fd_funk_txn_xid_eq_root( &oldest_xid ) && lineage->fork_depth>1UL ) {
199 0 : oldest_xid = lineage->fork[ lineage->fork_depth-2UL ];
200 0 : }
201 :
202 0 : ulong delay = xid->ul[0] - oldest_xid.ul[0];
203 : /* genesis_override is necessary when bootstrapping from genesis,
204 : without requiring fd_accdb_admin_v2_delay_set to accept 0. */
205 0 : int genesis_override = !xid->ul[0];
206 0 : if( delay >= accdb->slot_delay || genesis_override ) {
207 0 : FD_LOG_INFO(( "accdb xid %lu:%lu: pruning",
208 0 : oldest_xid.ul[0], oldest_xid.ul[1] ));
209 0 : fd_funk_txn_t * oldest = &funk->txn_pool->ele[ funk->shmem->child_head_cidx ];
210 0 : FD_TEST( fd_funk_txn_xid_eq( &oldest_xid, &oldest->xid ) );
211 0 : fd_accdb_txn_publish_one( accdb, oldest );
212 0 : }
213 0 : }
214 :
215 : void
216 : fd_accdb_admin_v2_delay_set( fd_accdb_admin_t * accdb_,
217 0 : ulong slot_delay ) {
218 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
219 0 : if( FD_UNLIKELY( !slot_delay ) ) FD_LOG_CRIT(( "invalid slot_delay (%lu)", slot_delay ));
220 0 : accdb->slot_delay = slot_delay;
221 0 : }
222 :
223 : fd_accdb_admin_vt_t const fd_accdb_admin_v2_vt = {
224 : .fini = fd_accdb_admin_v2_fini,
225 : .root_get = fd_accdb_v2_root_get,
226 : .attach_child = fd_accdb_v2_attach_child,
227 : .advance_root = fd_accdb_v2_advance_root,
228 : .cancel = fd_accdb_v2_cancel
229 : };
|