Line data Source code
1 : #include "fd_policy.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 :
4 : #define NONCE_NULL (UINT_MAX)
5 0 : #define DEFER_REPAIR_MS (200UL)
6 0 : #define TARGET_TICK_PER_SLOT (64.0)
7 0 : #define MS_PER_TICK (400.0 / TARGET_TICK_PER_SLOT)
8 :
9 : void *
10 0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed, fd_rnonce_ss_t const * rnonce_ss ) {
11 :
12 0 : if( FD_UNLIKELY( !shmem ) ) {
13 0 : FD_LOG_WARNING(( "NULL mem" ));
14 0 : return NULL;
15 0 : }
16 :
17 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
18 0 : FD_LOG_WARNING(( "misaligned mem" ));
19 0 : return NULL;
20 0 : }
21 :
22 0 : ulong footprint = fd_policy_footprint( dedup_max, peer_max );
23 0 : fd_memset( shmem, 0, footprint );
24 :
25 0 : ulong peer_chain_cnt = fd_policy_peer_map_chain_cnt_est( peer_max );
26 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
27 0 : fd_policy_t * policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), sizeof(fd_policy_t) );
28 0 : void * dedup_map = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(), fd_policy_dedup_map_footprint ( dedup_max ) );
29 0 : void * dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max ) );
30 0 : void * dedup_lru = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(), fd_policy_dedup_lru_footprint() );
31 0 : void * peers = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(), fd_policy_peer_map_footprint( peer_chain_cnt ) );
32 0 : void * peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_pool_align(), fd_policy_peer_pool_footprint( peer_max ) );
33 0 : void * peers_fast = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_dlist_align(), fd_policy_peer_dlist_footprint() );
34 0 : void * peers_slow = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_dlist_align(), fd_policy_peer_dlist_footprint() );
35 0 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
36 :
37 0 : policy->dedup.map = fd_policy_dedup_map_new ( dedup_map, dedup_max, seed );
38 0 : policy->dedup.pool = fd_policy_dedup_pool_new( dedup_pool, dedup_max );
39 0 : policy->dedup.lru = fd_policy_dedup_lru_new ( dedup_lru );
40 0 : policy->peers.map = fd_policy_peer_map_new ( peers, peer_chain_cnt, seed );
41 0 : policy->peers.pool = fd_policy_peer_pool_new ( peers_pool, peer_max );
42 0 : policy->peers.fast = fd_policy_peer_dlist_new( peers_fast );
43 0 : policy->peers.slow = fd_policy_peer_dlist_new( peers_slow );
44 0 : policy->turbine_slot0 = ULONG_MAX;
45 0 : policy->rnonce_ss[0] = *rnonce_ss;
46 :
47 0 : return shmem;
48 0 : }
49 :
50 : fd_policy_t *
51 0 : fd_policy_join( void * shpolicy ) {
52 0 : fd_policy_t * policy = (fd_policy_t *)shpolicy;
53 :
54 0 : if( FD_UNLIKELY( !policy ) ) {
55 0 : FD_LOG_WARNING(( "NULL policy" ));
56 0 : return NULL;
57 0 : }
58 :
59 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
60 0 : FD_LOG_WARNING(( "misaligned policy" ));
61 0 : return NULL;
62 0 : }
63 :
64 0 : fd_wksp_t * wksp = fd_wksp_containing( policy );
65 0 : if( FD_UNLIKELY( !wksp ) ) {
66 0 : FD_LOG_WARNING(( "policy must be part of a workspace" ));
67 0 : return NULL;
68 0 : }
69 :
70 0 : policy->dedup.map = fd_policy_dedup_map_join ( policy->dedup.map );
71 0 : policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool );
72 0 : policy->dedup.lru = fd_policy_dedup_lru_join ( policy->dedup.lru );
73 0 : policy->peers.map = fd_policy_peer_map_join ( policy->peers.map );
74 0 : policy->peers.pool = fd_policy_peer_pool_join ( policy->peers.pool );
75 0 : policy->peers.fast = fd_policy_peer_dlist_join( policy->peers.fast );
76 0 : policy->peers.slow = fd_policy_peer_dlist_join( policy->peers.slow );
77 :
78 0 : policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_init( policy->peers.slow, policy->peers.pool );
79 0 : policy->peers.select.stage = 0;
80 :
81 0 : return policy;
82 0 : }
83 :
84 : void *
85 0 : fd_policy_leave( fd_policy_t const * policy ) {
86 :
87 0 : if( FD_UNLIKELY( !policy ) ) {
88 0 : FD_LOG_WARNING(( "NULL policy" ));
89 0 : return NULL;
90 0 : }
91 :
92 0 : return (void *)policy;
93 0 : }
94 :
95 : void *
96 0 : fd_policy_delete( void * policy ) {
97 :
98 0 : if( FD_UNLIKELY( !policy ) ) {
99 0 : FD_LOG_WARNING(( "NULL policy" ));
100 0 : return NULL;
101 0 : }
102 :
103 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
104 0 : FD_LOG_WARNING(( "misaligned policy" ));
105 0 : return NULL;
106 0 : }
107 :
108 0 : return policy;
109 0 : }
110 :
111 : /* dedup_evict evicts the first element returned by the map iterator. */
112 :
113 : static void
114 0 : dedup_evict( fd_policy_t * policy ) {
115 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
116 0 : fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
117 0 : fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
118 0 : }
119 :
120 : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
121 : static int
122 0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
123 0 : fd_policy_dedup_t * dedup = &policy->dedup;
124 0 : fd_policy_dedup_ele_t * ele = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
125 0 : if( FD_UNLIKELY( !ele ) ) {
126 0 : if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
127 0 : ele = fd_policy_dedup_pool_ele_acquire( dedup->pool );
128 0 : ele->key = key;
129 0 : ele->req_ts = 0;
130 0 : fd_policy_dedup_map_ele_insert ( dedup->map, ele, dedup->pool );
131 0 : fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
132 0 : }
133 0 : if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) {
134 0 : fd_policy_dedup_lru_ele_remove( dedup->lru, ele, dedup->pool );
135 0 : fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
136 0 : return 1;
137 0 : }
138 0 : ele->req_ts = now;
139 0 : return 0;
140 0 : }
141 :
142 0 : static ulong ts_ms( long wallclock ) {
143 0 : return (ulong)wallclock / (ulong)1e6;
144 0 : }
145 :
146 : static int
147 0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
148 0 : if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
149 : /* Essentially is checking if current duration of block ( from the
150 : first shred received until now ) is greater than the highest tick
151 : received + 200ms. */
152 0 : double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
153 0 : double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
154 :
155 0 : if( current_duration >= tick_plus_buffer ){
156 0 : FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
157 0 : return 1;
158 0 : }
159 0 : return 0;
160 0 : }
161 :
162 : fd_pubkey_t const *
163 0 : fd_policy_peer_select( fd_policy_t * policy ) {
164 0 : fd_policy_peer_dlist_t * best_dlist = policy->peers.fast;
165 0 : fd_policy_peer_dlist_t * worst_dlist = policy->peers.slow;
166 0 : fd_policy_peer_t * pool = policy->peers.pool;
167 :
168 0 : if( FD_UNLIKELY( fd_policy_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
169 :
170 0 : fd_policy_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
171 :
172 0 : while( FD_UNLIKELY( fd_policy_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
173 0 : policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint));
174 0 : dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
175 0 : policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_init( dlist, pool );
176 0 : }
177 0 : fd_policy_peer_t * select = fd_policy_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool );
178 0 : policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
179 0 : return &select->key;
180 0 : }
181 :
182 : fd_repair_msg_t const *
183 0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot, int * charge_busy ) {
184 0 : fd_forest_blk_t * pool = fd_forest_pool( forest );
185 0 : fd_forest_subtlist_t * subtlist = fd_forest_subtlist( forest );
186 0 : *charge_busy = 0;
187 :
188 0 : if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
189 0 : if( FD_UNLIKELY( fd_policy_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
190 :
191 0 : fd_repair_msg_t * out = NULL;
192 0 : ulong now_ms = ts_ms( now );
193 :
194 0 : for( fd_forest_subtlist_iter_t iter = fd_forest_subtlist_iter_fwd_init( subtlist, pool );
195 0 : !fd_forest_subtlist_iter_done ( iter, subtlist, pool );
196 0 : iter = fd_forest_subtlist_iter_fwd_next( iter, subtlist, pool ) ) {
197 0 : *charge_busy = 1;
198 0 : fd_forest_blk_t * orphan = fd_forest_subtlist_iter_ele( iter, subtlist, pool );
199 0 : ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
200 0 : if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
201 0 : uint nonce = fd_rnonce_ss_compute( policy->rnonce_ss, 0, orphan->slot, 0U, now );
202 0 : out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, nonce, orphan->slot );
203 0 : return out;
204 0 : }
205 0 : }
206 :
207 : /* Select a slot to operate on 🔪. Advance either the orphan iter or
208 : regular iter. */
209 0 : fd_forest_iter_t * iter = NULL;
210 0 : if( FD_UNLIKELY( fd_forest_reqslist_is_empty( fd_forest_reqslist( forest ), fd_forest_reqspool( forest ) ) ) ) {
211 : /* If the main tree has nothing to iterate at the moment, we can
212 : request down the ORPHAN trees on slots we know about. */
213 0 : iter = &forest->orphiter;
214 0 : } else {
215 0 : iter = &forest->iter;
216 0 : }
217 :
218 0 : fd_forest_iter_next( iter, forest );
219 0 : if( FD_UNLIKELY( fd_forest_iter_done( iter, forest ) ) ) {
220 : // This happens when we have already requested all the shreds we know about.
221 0 : return NULL;
222 0 : }
223 :
224 0 : fd_forest_blk_t * ele = fd_forest_pool_ele( pool, iter->ele_idx );
225 0 : if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
226 : /* When we are at the head of the turbine, we should give turbine the
227 : chance to complete the shreds. Agave waits 200ms from the
228 : estimated "correct time" of the highest shred received to repair.
229 : i.e. if we've received the first 200 shreds, the 200th has a tick
230 : of x. Translate that to millis, and we should wait to request shred
231 : 201 until x + 200ms. If we have a hole, i.e. first 200 shreds
232 : receive except shred 100, and the 101th shred has a tick of y, we
233 : should wait until y + 200ms to request shred 100.
234 :
235 : Here we did not pass the timeout threshold, so we are not ready
236 : to repair this slot yet. But it's possible we have another fork
237 : that we need to repair... so we just should skip to the next SLOT
238 : in the main tree iterator. The likelihood that this ele is the
239 : head of turbine is high, which means that the shred_idx of the
240 : iterf is likely to be UINT_MAX, which means calling
241 : fd_forest_iter_next will advance the iterf to the next slot. */
242 0 : iter->shred_idx = UINT_MAX;
243 : /* TODO: Heinous... but the easiest way to ensure this slot gets
244 : added back to the requests deque is if we set the shred_idx to
245 : UINT_MAX, but maybe there should be an explicit API for it. */
246 :
247 0 : return NULL;
248 0 : }
249 :
250 0 : *charge_busy = 1;
251 :
252 0 : if( FD_UNLIKELY( iter->shred_idx == UINT_MAX ) ) {
253 0 : if( FD_UNLIKELY( ele->slot < highest_known_slot ) ) {
254 : // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
255 0 : uint nonce = fd_rnonce_ss_compute( policy->rnonce_ss, 0, ele->slot, 0U, now );
256 0 : out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, nonce, ele->slot, 0 );
257 0 : }
258 0 : } else {
259 0 : uint nonce = fd_rnonce_ss_compute( policy->rnonce_ss, 1, ele->slot, iter->shred_idx, now );
260 0 : out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, nonce, ele->slot, iter->shred_idx );
261 0 : if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
262 0 : }
263 0 : return out;
264 0 : }
265 :
266 : fd_policy_peer_t const *
267 0 : fd_policy_peer_upsert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
268 0 : fd_policy_peer_map_t * peer_map = policy->peers.map;
269 0 : fd_policy_peer_t * pool = policy->peers.pool;
270 0 : fd_policy_peer_t * peer = fd_policy_peer_map_ele_query( peer_map, key, NULL, pool );
271 0 : if( FD_UNLIKELY( !peer && fd_policy_peer_pool_free( pool ) ) ) {
272 0 : peer = fd_policy_peer_pool_ele_acquire( pool );
273 0 : peer->key = *key;
274 0 : peer->ip4 = addr->addr;
275 0 : peer->port = addr->port;
276 0 : peer->req_cnt = 0;
277 0 : peer->res_cnt = 0;
278 0 : peer->first_req_ts = 0;
279 0 : peer->last_req_ts = 0;
280 0 : peer->first_resp_ts = 0;
281 0 : peer->last_resp_ts = 0;
282 0 : peer->total_lat = 0;
283 0 : peer->stake = 0;
284 0 : peer->ping = 0;
285 :
286 0 : fd_policy_peer_map_ele_insert( peer_map, peer, pool );
287 0 : fd_policy_peer_dlist_ele_push_tail( policy->peers.slow, peer, pool );
288 0 : return peer;
289 0 : }
290 0 : if( FD_LIKELY( peer ) ) {
291 0 : peer->ip4 = addr->addr;
292 0 : peer->port = addr->port;
293 0 : }
294 0 : return NULL;
295 0 : }
296 :
297 : fd_policy_peer_t *
298 0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
299 0 : if( FD_UNLIKELY( memcmp( key->key, null_pubkey.key, 32UL ) == 0 ) ) {
300 0 : FD_LOG_WARNING(( "Repair policy peer with null pubkey." ));
301 0 : return NULL;
302 0 : };
303 0 : fd_policy_peer_t * pool = policy->peers.pool;
304 0 : return fd_policy_peer_map_ele_query( policy->peers.map, key, NULL, pool );
305 0 : }
306 :
307 : int
308 0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
309 0 : fd_policy_peer_t * pool = policy->peers.pool;
310 0 : fd_policy_peer_t * peer = fd_policy_peer_map_ele_query( policy->peers.map, key, NULL, pool );
311 0 : if( FD_UNLIKELY( !peer ) ) return 0;
312 :
313 0 : if( FD_UNLIKELY( policy->peers.select.iter == fd_policy_peer_pool_idx( pool, peer ) ) ) {
314 : /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
315 0 : fd_policy_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
316 0 : policy->peers.select.iter = fd_policy_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
317 0 : }
318 :
319 0 : fd_policy_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
320 0 : fd_policy_peer_dlist_ele_remove( bucket, peer, pool );
321 0 : fd_policy_peer_map_ele_remove ( policy->peers.map, key, NULL, pool );
322 0 : fd_policy_peer_pool_ele_release( pool, peer );
323 0 : return 1;
324 0 : }
325 :
326 : void
327 0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
328 0 : fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
329 0 : if( FD_LIKELY( active ) ) {
330 0 : active->req_cnt++;
331 0 : active->last_req_ts = fd_tickcount();
332 0 : if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
333 0 : }
334 0 : }
335 :
336 : void
337 0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) {
338 0 : fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
339 0 : if( FD_LIKELY( peer ) ) {
340 0 : long now = fd_tickcount();
341 0 : fd_policy_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
342 0 : peer->res_cnt++;
343 0 : if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
344 0 : peer->last_resp_ts = now;
345 0 : peer->total_lat += rtt;
346 0 : fd_policy_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
347 :
348 0 : if( prev_bucket != new_bucket ) {
349 0 : fd_policy_peer_dlist_ele_remove ( prev_bucket, peer, policy->peers.pool );
350 0 : fd_policy_peer_dlist_ele_push_tail( new_bucket, peer, policy->peers.pool );
351 0 : }
352 0 : }
353 0 : }
354 :
355 : void
356 0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
357 0 : policy->turbine_slot0 = slot;
358 0 : }
359 :
|