Line data Source code
1 : #include "fd_sspeer_selector.h"
2 : #include "../../../util/log/fd_log.h"
3 :
4 : static int
5 : fd_sspeer_key_private_eq( fd_sspeer_key_t const * k0,
6 0 : fd_sspeer_key_t const * k1 ) {
7 0 : if( k0->is_url!=k1->is_url ) return 0;
8 0 : if( k0->is_url ) {
9 0 : return !strncmp( k0->url.hostname, k1->url.hostname, sizeof(k0->url.hostname) )
10 0 : && k0->url.resolved_addr.l==k1->url.resolved_addr.l;
11 0 : }
12 0 : return !memcmp( k0->pubkey, k1->pubkey, FD_PUBKEY_FOOTPRINT );
13 0 : }
14 :
15 : static ulong
16 : fd_sspeer_key_private_hash( fd_sspeer_key_t const * key,
17 0 : ulong seed ) {
18 0 : if( key->is_url ) {
19 : /* Use strnlen in case the string is not properly \0 terminated.
20 : Ideally, one would prefer sizeof(key->url.hostname) but that
21 : requires guaranteed zero-padding. */
22 0 : ulong h = fd_hash( seed, key->url.hostname, strnlen( key->url.hostname, sizeof(key->url.hostname) ) );
23 : /* fd_ip4_port_t is not a complete 64bit ulong, therefore compose
24 : the word from its parts to avoid random unused bytes. */
25 0 : ulong a = (ulong)key->url.resolved_addr.addr | ( ((ulong)key->url.resolved_addr.port) << 32 );
26 : /* Chaining "a" through fd_hash would give better avalanche
27 : properties, but it is probably overkill for a chain hash map. */
28 0 : return h ^ a;
29 0 : }
30 0 : return fd_hash( seed, key->pubkey, FD_PUBKEY_FOOTPRINT );
31 0 : }
32 :
33 : struct fd_sspeer_private {
34 : fd_sspeer_key_t key;
35 : fd_ip4_port_t addr;
36 : ulong full_slot;
37 : ulong incr_slot;
38 : uchar full_hash[ FD_HASH_FOOTPRINT ];
39 : uchar incr_hash[ FD_HASH_FOOTPRINT ];
40 : ulong latency;
41 : ulong score;
42 : int valid;
43 :
44 : struct {
45 : ulong next;
46 : } pool;
47 :
48 : struct {
49 : ulong next;
50 : ulong prev;
51 : } map_by_key;
52 :
53 : struct {
54 : ulong next;
55 : ulong prev;
56 : } map_by_addr;
57 :
58 : struct {
59 : ulong parent;
60 : ulong left;
61 : ulong right;
62 : ulong prio;
63 : } score_treap;
64 : };
65 :
66 : typedef struct fd_sspeer_private fd_sspeer_private_t;
67 :
68 : #define POOL_NAME peer_pool
69 0 : #define POOL_T fd_sspeer_private_t
70 : #define POOL_IDX_T ulong
71 0 : #define POOL_NEXT pool.next
72 : #include "../../../util/tmpl/fd_pool.c"
73 :
74 : #define MAP_NAME peer_map_by_key
75 0 : #define MAP_KEY key
76 0 : #define MAP_ELE_T fd_sspeer_private_t
77 : #define MAP_KEY_T fd_sspeer_key_t
78 0 : #define MAP_PREV map_by_key.prev
79 0 : #define MAP_NEXT map_by_key.next
80 0 : #define MAP_KEY_EQ(k0,k1) (fd_sspeer_key_private_eq(k0,k1))
81 0 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_private_hash(key,seed))
82 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
83 : #include "../../../util/tmpl/fd_map_chain.c"
84 :
85 : #define MAP_NAME peer_map_by_addr
86 0 : #define MAP_KEY addr
87 0 : #define MAP_ELE_T fd_sspeer_private_t
88 : #define MAP_KEY_T fd_ip4_port_t
89 0 : #define MAP_PREV map_by_addr.prev
90 0 : #define MAP_NEXT map_by_addr.next
91 0 : #define MAP_KEY_EQ(k0,k1) ((k0)->l==(k1)->l)
92 0 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
93 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
94 : #define MAP_MULTI 1
95 : #include "../../../util/tmpl/fd_map_chain.c"
96 :
97 0 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
98 :
99 : #define TREAP_T fd_sspeer_private_t
100 : #define TREAP_NAME score_treap
101 : #define TREAP_QUERY_T void * /* We don't use query ... */
102 : #define TREAP_CMP(a,b) (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
103 : implementation to cmp either */
104 0 : #define TREAP_IDX_T ulong
105 0 : #define TREAP_LT COMPARE_WORSE
106 0 : #define TREAP_PARENT score_treap.parent
107 0 : #define TREAP_LEFT score_treap.left
108 0 : #define TREAP_RIGHT score_treap.right
109 0 : #define TREAP_PRIO score_treap.prio
110 : #include "../../../util/tmpl/fd_treap.c"
111 :
112 0 : #define DEFAULT_SLOTS_BEHIND (1000UL*1000UL) /* 1,000,000 slots behind */
113 0 : #define DEFAULT_PEER_LATENCY (100L*1000L*1000L) /* 100ms */
114 :
115 : #define FD_SSPEER_SELECTOR_DEBUG 0
116 :
117 : struct fd_sspeer_selector_private {
118 : fd_sspeer_private_t * pool;
119 : peer_map_by_key_t * map_by_key;
120 : peer_map_by_addr_t * map_by_addr;
121 : score_treap_t * score_treap;
122 : score_treap_t * shadow_score_treap;
123 : ulong * peer_idx_list;
124 : fd_sscluster_slot_t cluster_slot;
125 : int incremental_snapshot_fetch;
126 : ulong max_peers;
127 :
128 : ulong magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
129 : };
130 :
131 : FD_FN_CONST ulong
132 0 : fd_sspeer_selector_align( void ) {
133 0 : return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(),
134 0 : fd_ulong_max( peer_map_by_key_align(), fd_ulong_max( peer_map_by_addr_align(),
135 0 : fd_ulong_max( score_treap_align(), alignof(ulong) ) ) ) ) );
136 0 : }
137 :
138 : FD_FN_CONST ulong
139 0 : fd_sspeer_selector_footprint( ulong max_peers ) {
140 0 : ulong l;
141 0 : l = FD_LAYOUT_INIT;
142 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
143 0 : l = FD_LAYOUT_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
144 0 : l = FD_LAYOUT_APPEND( l, peer_map_by_key_align(), peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
145 0 : l = FD_LAYOUT_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
146 0 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
147 0 : l = FD_LAYOUT_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
148 0 : l = FD_LAYOUT_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
149 0 : return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
150 0 : }
151 :
152 : void *
153 : fd_sspeer_selector_new( void * shmem,
154 : ulong max_peers,
155 : int incremental_snapshot_fetch,
156 0 : ulong seed ) {
157 0 : if( FD_UNLIKELY( !shmem ) ) {
158 0 : FD_LOG_WARNING(( "NULL shmem" ));
159 0 : return NULL;
160 0 : }
161 :
162 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
163 0 : FD_LOG_WARNING(( "unaligned shmem" ));
164 0 : return NULL;
165 0 : }
166 :
167 0 : if( FD_UNLIKELY( max_peers < 1UL ) ) {
168 0 : FD_LOG_WARNING(( "max_peers must be at least 1" ));
169 0 : return NULL;
170 0 : }
171 :
172 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
173 0 : fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
174 0 : void * _pool = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(), peer_pool_footprint( 2UL*max_peers ) );
175 0 : void * _map = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_key_align(), peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
176 0 : void * _multimap_by_addr = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
177 0 : void * _score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
178 0 : void * _shadow_score_treap = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(), score_treap_footprint( max_peers ) );
179 0 : void * _peer_idx_list = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), max_peers * sizeof(ulong) );
180 :
181 0 : selector->pool = peer_pool_join( peer_pool_new( _pool, 2UL*max_peers ) );
182 0 : selector->map_by_key = peer_map_by_key_join( peer_map_by_key_new( _map, peer_map_by_key_chain_cnt_est( 2UL*max_peers ), seed ) );
183 0 : selector->map_by_addr = peer_map_by_addr_join( peer_map_by_addr_new( _multimap_by_addr, peer_map_by_addr_chain_cnt_est( 2UL*max_peers ), seed ) );
184 0 : selector->score_treap = score_treap_join( score_treap_new( _score_treap, max_peers ) );
185 0 : selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
186 0 : selector->peer_idx_list = (ulong *)_peer_idx_list;
187 0 : selector->max_peers = max_peers;
188 :
189 0 : selector->cluster_slot.full = 0UL;
190 0 : selector->cluster_slot.incremental = 0UL;
191 0 : selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
192 :
193 0 : FD_COMPILER_MFENCE();
194 0 : FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
195 0 : FD_COMPILER_MFENCE();
196 :
197 0 : return (void *)selector;
198 0 : }
199 :
200 : fd_sspeer_selector_t *
201 0 : fd_sspeer_selector_join( void * shselector ) {
202 0 : if( FD_UNLIKELY( !shselector ) ) {
203 0 : FD_LOG_WARNING(( "NULL shselector" ));
204 0 : return NULL;
205 0 : }
206 :
207 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
208 0 : FD_LOG_WARNING(( "misaligned shselector" ));
209 0 : return NULL;
210 0 : }
211 :
212 0 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
213 :
214 0 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
215 0 : FD_LOG_WARNING(( "bad magic" ));
216 0 : return NULL;
217 0 : }
218 :
219 0 : return selector;
220 0 : }
221 :
222 : void *
223 0 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
224 0 : if( FD_UNLIKELY( !selector ) ) {
225 0 : FD_LOG_WARNING(( "NULL selector" ));
226 0 : return NULL;
227 0 : }
228 :
229 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
230 0 : FD_LOG_WARNING(( "misaligned selector" ));
231 0 : return NULL;
232 0 : }
233 :
234 0 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
235 0 : FD_LOG_WARNING(( "bad magic" ));
236 0 : return NULL;
237 0 : }
238 :
239 0 : selector->pool = peer_pool_leave( selector->pool );
240 0 : selector->map_by_key = peer_map_by_key_leave( selector->map_by_key );
241 0 : selector->map_by_addr = peer_map_by_addr_leave( selector->map_by_addr );
242 0 : selector->score_treap = score_treap_leave( selector->score_treap );
243 0 : selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
244 :
245 0 : return (void *)selector;
246 0 : }
247 :
248 : void *
249 0 : fd_sspeer_selector_delete( void * shselector ) {
250 0 : if( FD_UNLIKELY( !shselector ) ) {
251 0 : FD_LOG_WARNING(( "NULL shselector" ));
252 0 : return NULL;
253 0 : }
254 :
255 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
256 0 : FD_LOG_WARNING(( "misaligned shselector" ));
257 0 : return NULL;
258 0 : }
259 :
260 0 : fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
261 :
262 0 : if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
263 0 : FD_LOG_WARNING(( "bad magic" ));
264 0 : return NULL;
265 0 : }
266 :
267 0 : selector->pool = peer_pool_delete( selector->pool );
268 0 : selector->map_by_key = peer_map_by_key_delete( selector->map_by_key );
269 0 : selector->map_by_addr = peer_map_by_addr_delete( selector->map_by_addr );
270 0 : selector->score_treap = score_treap_delete( selector->score_treap );
271 0 : selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
272 :
273 0 : FD_COMPILER_MFENCE();
274 0 : FD_VOLATILE( selector->magic ) = 0UL;
275 0 : FD_COMPILER_MFENCE();
276 :
277 0 : return (void *)selector;
278 0 : }
279 :
280 : /* Calculates a score for a peer given its latency and its resolved
281 : full and incremental slots */
282 : ulong
283 : fd_sspeer_selector_score( fd_sspeer_selector_t * selector,
284 : ulong peer_latency,
285 : ulong full_slot,
286 0 : ulong incr_slot ) {
287 0 : static const ulong slots_behind_penalty = 1000UL;
288 0 : ulong slot = ULONG_MAX;
289 0 : ulong slots_behind = DEFAULT_SLOTS_BEHIND;
290 0 : peer_latency = peer_latency!=ULONG_MAX ? peer_latency : DEFAULT_PEER_LATENCY;
291 :
292 0 : if( FD_LIKELY( full_slot!=ULONG_MAX ) ) {
293 0 : if( FD_UNLIKELY( incr_slot==ULONG_MAX ) ) {
294 0 : slot = full_slot;
295 0 : slots_behind = selector->cluster_slot.full>slot ? selector->cluster_slot.full - slot : 0UL;
296 0 : } else {
297 0 : slot = incr_slot;
298 0 : slots_behind = selector->cluster_slot.incremental>slot ? selector->cluster_slot.incremental - slot : 0UL;
299 0 : }
300 0 : }
301 :
302 : /* TODO: come up with a better/more dynamic score function */
303 0 : return peer_latency + slots_behind_penalty*slots_behind;
304 0 : }
305 :
306 : /* Updates a peer's score with new values for latency and/or resolved
307 : full/incremental slots */
308 : static void
309 : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
310 : fd_sspeer_private_t * peer,
311 : ulong latency,
312 : ulong full_slot,
313 : ulong incr_slot,
314 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
315 0 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
316 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
317 :
318 0 : ulong peer_latency = latency!=ULONG_MAX ? latency : peer->latency;
319 0 : ulong peer_full_slot = full_slot!=ULONG_MAX ? full_slot : peer->full_slot;
320 0 : ulong peer_incr_slot = incr_slot!=ULONG_MAX ? incr_slot : peer->incr_slot;
321 :
322 0 : peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_full_slot, peer_incr_slot );
323 :
324 0 : peer->latency = peer_latency;
325 0 : peer->full_slot = peer_full_slot;
326 0 : peer->incr_slot = peer_incr_slot;
327 0 : if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
328 0 : if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
329 :
330 0 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
331 0 : }
332 :
333 : int
334 : fd_sspeer_selector_update_on_resolve( fd_sspeer_selector_t * selector,
335 : fd_sspeer_key_t const * key,
336 : ulong full_slot,
337 : ulong incr_slot,
338 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
339 0 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
340 0 : if( FD_UNLIKELY( key==NULL ) ) return -1;
341 0 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
342 0 : if( FD_UNLIKELY( peer==NULL ) ) return -2;
343 0 : fd_sspeer_selector_update( selector, peer, ULONG_MAX, full_slot, incr_slot, full_hash, incr_hash );
344 0 : peer->valid = peer->full_slot!=ULONG_MAX;
345 0 : return 0;
346 0 : }
347 :
348 : ulong
349 : fd_sspeer_selector_update_on_ping( fd_sspeer_selector_t * selector,
350 : fd_ip4_port_t addr,
351 0 : ulong latency ) {
352 0 : ulong ele_idx = peer_map_by_addr_idx_query_const( selector->map_by_addr, &addr, ULONG_MAX, selector->pool );
353 0 : ulong cnt = 0UL;
354 0 : for(;;) {
355 0 : if( FD_UNLIKELY( ele_idx==ULONG_MAX ) ) break;
356 0 : fd_sspeer_private_t * peer = selector->pool + ele_idx;
357 0 : fd_sspeer_selector_update( selector, peer, latency, ULONG_MAX, ULONG_MAX, NULL, NULL );
358 0 : ele_idx = peer_map_by_addr_idx_next_const( ele_idx, ULONG_MAX, selector->pool );
359 0 : cnt++;
360 0 : }
361 0 : return cnt;
362 0 : }
363 :
364 : ulong
365 : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
366 : fd_sspeer_key_t const * key,
367 : fd_ip4_port_t addr,
368 : ulong latency,
369 : ulong full_slot,
370 : ulong incr_slot,
371 : uchar const full_hash[ FD_HASH_FOOTPRINT ],
372 0 : uchar const incr_hash[ FD_HASH_FOOTPRINT ] ) {
373 0 : if( FD_UNLIKELY( key==NULL ) ) return ULONG_MAX;
374 : /* A peer without a valid address cannot be added to the selector.
375 : For an existing peer changing from a valid address to 0, it is
376 : the caller's responsibility to remove them. */
377 0 : if( FD_UNLIKELY( !addr.l ) ) return ULONG_MAX;
378 :
379 0 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
380 0 : if( FD_LIKELY( peer ) ) {
381 0 : if( FD_UNLIKELY( peer->addr.l!=addr.l ) ) {
382 0 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
383 0 : peer->addr = addr;
384 0 : peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
385 0 : }
386 0 : fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot, full_hash, incr_hash );
387 0 : } else {
388 0 : if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) {
389 0 : FD_LOG_WARNING(( "peer selector pool exhausted" ));
390 0 : return ULONG_MAX;
391 0 : }
392 0 : if( FD_UNLIKELY( score_treap_ele_cnt(selector->score_treap)>=selector->max_peers ) ) {
393 0 : FD_LOG_WARNING(( "peer selector at max capacity" ));
394 0 : return ULONG_MAX;
395 0 : }
396 :
397 0 : peer = peer_pool_ele_acquire( selector->pool );
398 0 : peer->key = *key;
399 0 : peer->addr = addr;
400 0 : peer->latency = latency;
401 0 : peer->score = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
402 0 : peer->full_slot = full_slot;
403 0 : peer->incr_slot = incr_slot;
404 0 : if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
405 0 : else fd_memset( peer->full_hash, 0, FD_HASH_FOOTPRINT );
406 0 : if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
407 0 : else fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
408 0 : peer_map_by_key_ele_insert( selector->map_by_key, peer, selector->pool );
409 0 : peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
410 0 : score_treap_ele_insert( selector->score_treap, peer, selector->pool );
411 0 : }
412 0 : peer->valid = peer->full_slot!=ULONG_MAX;
413 0 : return peer->score;
414 0 : }
415 :
416 : void
417 : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
418 0 : fd_sspeer_key_t const * key ) {
419 0 : if( FD_UNLIKELY( key==NULL ) ) return;
420 0 : fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
421 0 : if( FD_UNLIKELY( peer==NULL ) ) return;
422 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
423 0 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
424 0 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
425 0 : peer_pool_ele_release( selector->pool, peer );
426 0 : }
427 :
428 : void
429 : fd_sspeer_selector_remove_by_addr( fd_sspeer_selector_t * selector,
430 0 : fd_ip4_port_t addr ) {
431 0 : for(;;) {
432 0 : fd_sspeer_private_t * peer = peer_map_by_addr_ele_remove( selector->map_by_addr, &addr, NULL, selector->pool );
433 0 : if( FD_UNLIKELY( peer==NULL ) ) break;
434 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
435 0 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
436 0 : peer_pool_ele_release( selector->pool, peer );
437 0 : }
438 0 : }
439 :
440 : fd_sspeer_t
441 : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
442 : int incremental,
443 0 : ulong base_slot ) {
444 0 : if( FD_UNLIKELY( incremental ) ) {
445 0 : FD_TEST( base_slot!=ULONG_MAX );
446 0 : }
447 :
448 0 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
449 0 : !score_treap_fwd_iter_done( iter );
450 0 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
451 0 : fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
452 0 : if( FD_LIKELY( peer->valid &&
453 0 : (!incremental ||
454 0 : (incremental && peer->full_slot==base_slot) ) ) ) {
455 0 : fd_sspeer_t best = {
456 0 : .addr = peer->addr,
457 0 : .full_slot = peer->full_slot,
458 0 : .incr_slot = peer->incr_slot,
459 0 : .score = peer->score,
460 0 : };
461 0 : fd_memcpy( best.full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
462 0 : fd_memcpy( best.incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
463 0 : return best;
464 0 : }
465 0 : }
466 :
467 0 : return (fd_sspeer_t){
468 0 : .addr = { .l=0UL },
469 0 : .full_slot = ULONG_MAX,
470 0 : .incr_slot = ULONG_MAX,
471 0 : .score = ULONG_MAX,
472 0 : .full_hash = {0},
473 0 : .incr_hash = {0},
474 0 : };
475 0 : }
476 :
477 : void
478 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
479 : ulong full_slot,
480 0 : ulong incr_slot ) {
481 0 : if( full_slot==ULONG_MAX && incr_slot==ULONG_MAX ) return;
482 :
483 0 : FD_TEST( full_slot!=ULONG_MAX );
484 0 : if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
485 : /* incremental slot is less than or equal to cluster incremental slot */
486 0 : if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental!=ULONG_MAX && incr_slot<=selector->cluster_slot.incremental ) ) return;
487 : /* incremental slot is less than or equal to cluster full slot when cluster incremental slot does not exist */
488 0 : else if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental==ULONG_MAX && incr_slot<=selector->cluster_slot.full ) ) return;
489 : /* full slot is less than cluster full slot when incremental slot does not exist */
490 0 : else if( FD_UNLIKELY( incr_slot==ULONG_MAX && full_slot<=selector->cluster_slot.full ) ) return;
491 0 : } else {
492 0 : if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
493 0 : }
494 :
495 0 : selector->cluster_slot.full = full_slot;
496 0 : selector->cluster_slot.incremental = incr_slot;
497 :
498 0 : if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
499 :
500 : /* Rescore all peers
501 : TODO: make more performant, maybe make a treap rebalance API */
502 0 : ulong idx = 0UL;
503 0 : for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
504 0 : !score_treap_fwd_iter_done( iter );
505 0 : iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
506 : /* Do not remove the peer from the treap while the iterator is
507 : running. Removing from peer_map(s) here is ok. */
508 0 : fd_sspeer_private_t * peer = score_treap_fwd_iter_ele( iter, selector->pool );
509 0 : fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
510 0 : shadow_peer->latency = peer->latency;
511 0 : shadow_peer->full_slot = peer->full_slot;
512 0 : shadow_peer->incr_slot = peer->incr_slot;
513 0 : shadow_peer->addr = peer->addr;
514 0 : shadow_peer->key = peer->key;
515 0 : shadow_peer->score = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
516 0 : shadow_peer->valid = peer->valid;
517 0 : fd_memcpy( shadow_peer->full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
518 0 : fd_memcpy( shadow_peer->incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
519 0 : score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
520 0 : selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
521 0 : peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
522 0 : peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
523 0 : peer_map_by_key_ele_insert( selector->map_by_key, shadow_peer, selector->pool );
524 0 : peer_map_by_addr_ele_insert( selector->map_by_addr, shadow_peer, selector->pool );
525 0 : }
526 :
527 : /* clear score treap*/
528 0 : for( ulong i=0UL; i<idx; i++ ) {
529 0 : fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
530 0 : score_treap_ele_remove( selector->score_treap, peer, selector->pool );
531 0 : peer_pool_ele_release( selector->pool, peer );
532 0 : }
533 :
534 0 : score_treap_t * tmp = selector->score_treap;
535 0 : selector->score_treap = selector->shadow_score_treap;
536 0 : selector->shadow_score_treap = tmp;
537 :
538 : #if FD_SSPEER_SELECTOR_DEBUG
539 : FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
540 : #endif
541 0 : }
542 :
543 : fd_sscluster_slot_t
544 0 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
545 0 : return selector->cluster_slot;
546 0 : }
547 :
548 : ulong
549 0 : fd_sspeer_selector_peer_map_by_key_ele_cnt( fd_sspeer_selector_t * selector ) {
550 0 : ulong cnt = 0UL;
551 0 : for( peer_map_by_key_iter_t iter = peer_map_by_key_iter_init( selector->map_by_key, selector->pool );
552 0 : !peer_map_by_key_iter_done( iter, selector->map_by_key, selector->pool );
553 0 : iter = peer_map_by_key_iter_next( iter, selector->map_by_key, selector->pool ) ) {
554 0 : cnt++;
555 0 : }
556 0 : return cnt;
557 0 : }
558 :
559 : ulong
560 0 : fd_sspeer_selector_peer_map_by_addr_ele_cnt( fd_sspeer_selector_t * selector ) {
561 0 : ulong cnt = 0UL;
562 0 : for( peer_map_by_addr_iter_t iter = peer_map_by_addr_iter_init( selector->map_by_addr, selector->pool );
563 0 : !peer_map_by_addr_iter_done( iter, selector->map_by_addr, selector->pool );
564 0 : iter = peer_map_by_addr_iter_next( iter, selector->map_by_addr, selector->pool ) ) {
565 0 : cnt++;
566 0 : }
567 0 : return cnt;
568 0 : }
|