Line data Source code
1 : #include "fd_active_set.h"
2 : #include "fd_gossip_txbuild.h"
3 : #include "fd_gossip_wsample.h"
4 : #include "fd_bloom.h"
5 : #include "../../util/net/fd_net_headers.h"
6 :
7 : #define FD_ACTIVE_SET_STAKE_ENTRIES (25UL)
8 : #define FD_ACTIVE_SET_PEERS_PER_ENTRY (12UL)
9 : #define FD_ACTIVE_SET_MAX_PEERS (FD_ACTIVE_SET_STAKE_ENTRIES*FD_ACTIVE_SET_PEERS_PER_ENTRY) /* 300 */
10 :
11 : struct fd_active_set_peer {
12 : long timestamp;
13 : ulong ci_idx;
14 : fd_bloom_t * bloom;
15 : fd_gossip_txbuild_t txbuild[1];
16 :
17 : struct {
18 : ulong prev;
19 : ulong next;
20 : } dlist;
21 : };
22 :
23 : typedef struct fd_active_set_peer fd_active_set_peer_t;
24 :
25 : #define DLIST_NAME push_dlist
26 : #define DLIST_ELE_T fd_active_set_peer_t
27 0 : #define DLIST_PREV dlist.prev
28 0 : #define DLIST_NEXT dlist.next
29 : #include "../../util/tmpl/fd_dlist.c"
30 :
31 : struct fd_active_set_entry {
32 : ulong nodes_idx; /* points to oldest entry in set */
33 : ulong nodes_len;
34 : };
35 :
36 : typedef struct fd_active_set_entry fd_active_set_entry_t;
37 :
38 : struct __attribute__((aligned(FD_ACTIVE_SET_ALIGN))) fd_active_set_private {
39 : fd_active_set_entry_t entries[ FD_ACTIVE_SET_STAKE_ENTRIES ][ 1 ];
40 : fd_active_set_peer_t peers[ FD_ACTIVE_SET_MAX_PEERS ];
41 :
42 : long next_rotate_nanos;
43 : ulong rotate_bucket; /* 0..24, round-robin */
44 :
45 : uchar identity_pubkey[ 32UL ];
46 : ulong identity_stake;
47 :
48 : fd_gossip_wsample_t * wsample;
49 : fd_crds_t * crds;
50 : fd_rng_t * rng;
51 : push_dlist_t * push_dlist;
52 :
53 : fd_gossip_send_fn send_fn;
54 : void * send_fn_ctx;
55 :
56 : fd_active_set_metrics_t metrics[1];
57 :
58 : ulong magic; /* ==FD_ACTIVE_SET_MAGIC */
59 : };
60 :
61 : FD_FN_CONST ulong
62 0 : fd_active_set_align( void ) {
63 0 : return FD_ACTIVE_SET_ALIGN;
64 0 : }
65 :
66 : FD_FN_CONST ulong
67 0 : fd_active_set_footprint( void ) {
68 0 : ulong l;
69 0 : l = FD_LAYOUT_INIT;
70 0 : l = FD_LAYOUT_APPEND( l, FD_ACTIVE_SET_ALIGN, sizeof(fd_active_set_t) );
71 0 : l = FD_LAYOUT_APPEND( l, FD_BLOOM_ALIGN, 25UL*12UL*fd_bloom_footprint( 0.1, 32768UL ) );
72 0 : l = FD_LAYOUT_APPEND( l, push_dlist_align(), push_dlist_footprint() );
73 0 : return FD_LAYOUT_FINI( l, FD_ACTIVE_SET_ALIGN );
74 0 : }
75 :
76 : void *
77 : fd_active_set_new( void * shmem,
78 : fd_gossip_wsample_t * wsample,
79 : fd_crds_t * crds,
80 : fd_rng_t * rng,
81 : uchar const * identity_pubkey,
82 : ulong identity_stake,
83 : fd_gossip_send_fn send_fn,
84 0 : void * send_fn_ctx ) {
85 0 : if( FD_UNLIKELY( !shmem ) ) {
86 0 : FD_LOG_WARNING(( "NULL shmem" ));
87 0 : return NULL;
88 0 : }
89 :
90 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_active_set_align() ) ) ) {
91 0 : FD_LOG_WARNING(( "misaligned shmem" ));
92 0 : return NULL;
93 0 : }
94 :
95 0 : ulong bloom_footprint = fd_bloom_footprint( 0.1, 32768UL );
96 :
97 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
98 0 : fd_active_set_t * as = FD_SCRATCH_ALLOC_APPEND( l, FD_ACTIVE_SET_ALIGN, sizeof(fd_active_set_t) );
99 0 : uchar * _blooms = FD_SCRATCH_ALLOC_APPEND( l, FD_BLOOM_ALIGN, 25UL*12UL*bloom_footprint );
100 0 : push_dlist_t * _push_dlist = FD_SCRATCH_ALLOC_APPEND( l, push_dlist_align(), push_dlist_footprint() );
101 :
102 0 : as->next_rotate_nanos = 0L;
103 0 : as->rotate_bucket = 0UL;
104 0 : fd_memcpy( as->identity_pubkey, identity_pubkey, 32UL );
105 0 : as->identity_stake = identity_stake;
106 :
107 0 : as->wsample = wsample;
108 0 : as->crds = crds;
109 0 : as->rng = rng;
110 0 : for( ulong i=0UL; i<25UL; i++ ) {
111 0 : fd_active_set_entry_t * entry = as->entries[ i ];
112 0 : entry->nodes_idx = 0UL;
113 0 : entry->nodes_len = 0UL;
114 :
115 0 : for( ulong j=0UL; j<12UL; j++ ) {
116 0 : fd_active_set_peer_t * peer = &as->peers[ i*12UL+j ];
117 0 : peer->bloom = fd_bloom_join( fd_bloom_new( _blooms, rng, 0.1, 32768UL ) );
118 0 : if( FD_UNLIKELY( !peer->bloom ) ) {
119 0 : FD_LOG_WARNING(( "failed to create bloom filter" ));
120 0 : return NULL;
121 0 : }
122 0 : _blooms += bloom_footprint;
123 0 : }
124 0 : }
125 :
126 0 : as->push_dlist = push_dlist_join( push_dlist_new( _push_dlist ) );
127 :
128 0 : as->send_fn = send_fn;
129 0 : as->send_fn_ctx = send_fn_ctx;
130 :
131 0 : memset( as->metrics, 0, sizeof(fd_active_set_metrics_t) );
132 :
133 0 : FD_COMPILER_MFENCE();
134 0 : FD_VOLATILE( as->magic ) = FD_ACTIVE_SET_MAGIC;
135 0 : FD_COMPILER_MFENCE();
136 :
137 0 : return (void *)as;
138 0 : }
139 :
140 : fd_active_set_t *
141 0 : fd_active_set_join( void * shas ) {
142 0 : if( FD_UNLIKELY( !shas ) ) {
143 0 : FD_LOG_WARNING(( "NULL shas" ));
144 0 : return NULL;
145 0 : }
146 :
147 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shas, fd_active_set_align() ) ) ) {
148 0 : FD_LOG_WARNING(( "misaligned shas" ));
149 0 : return NULL;
150 0 : }
151 :
152 0 : fd_active_set_t * as = (fd_active_set_t *)shas;
153 :
154 0 : if( FD_UNLIKELY( as->magic!=FD_ACTIVE_SET_MAGIC ) ) {
155 0 : FD_LOG_WARNING(( "bad magic" ));
156 0 : return NULL;
157 0 : }
158 :
159 0 : return as;
160 0 : }
161 :
162 : fd_active_set_metrics_t const *
163 0 : fd_active_set_metrics( fd_active_set_t const * active_set ) {
164 0 : return active_set->metrics;
165 0 : }
166 :
167 : void
168 : fd_active_set_set_identity( fd_active_set_t * active_set,
169 : uchar const * identity_pubkey,
170 0 : ulong identity_stake ) {
171 0 : fd_memcpy( active_set->identity_pubkey, identity_pubkey, 32UL );
172 0 : active_set->identity_stake = identity_stake;
173 0 : }
174 :
175 : void
176 : fd_active_set_prune( fd_active_set_t * active_set,
177 : uchar const * push_dest,
178 : uchar const * origin,
179 0 : ulong origin_stake ) {
180 0 : if( FD_UNLIKELY( !memcmp( active_set->identity_pubkey, origin, 32UL ) ) ) return;
181 :
182 0 : ulong bucket = fd_active_set_stake_bucket( fd_ulong_min( active_set->identity_stake, origin_stake ) );
183 0 : for( ulong i=0UL; i<active_set->entries[ bucket ]->nodes_len; i++ ) {
184 0 : ulong peer_idx = (active_set->entries[ bucket ]->nodes_idx+i) % 12UL;
185 0 : uchar const * peer_pubkey = fd_crds_ci_pubkey( active_set->crds, active_set->peers[ bucket*12UL+peer_idx ].ci_idx );
186 0 : if( FD_UNLIKELY( !memcmp( peer_pubkey, push_dest, 32UL ) ) ) {
187 0 : fd_bloom_insert( active_set->peers[ bucket*12UL+peer_idx ].bloom, origin, 32UL );
188 0 : return;
189 0 : }
190 0 : }
191 0 : }
192 :
193 : static void
194 : push_flush( fd_active_set_t * active_set,
195 : fd_active_set_peer_t * peer,
196 : fd_stem_context_t * stem,
197 0 : long now ) {
198 0 : if( FD_UNLIKELY( !peer->txbuild->crds_len ) ) return;
199 :
200 0 : fd_gossip_contact_info_t const * ci = fd_crds_ci( active_set->crds, peer->ci_idx );
201 : // TODO: Support ipv6, or prevent ending up in set
202 0 : fd_ip4_port_t dest_addr = {
203 0 : .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
204 0 : .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port,
205 0 : };
206 :
207 0 : push_dlist_ele_remove( active_set->push_dlist, peer, active_set->peers );
208 :
209 0 : active_set->send_fn( active_set->send_fn_ctx, stem, peer->txbuild->bytes, peer->txbuild->bytes_len, &dest_addr, (ulong)now );
210 :
211 0 : active_set->metrics->message_tx[ peer->txbuild->tag ]++;
212 0 : active_set->metrics->message_tx_bytes[ peer->txbuild->tag ] += peer->txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
213 0 : for( ulong i=0UL; i<peer->txbuild->crds_len; i++ ) {
214 0 : active_set->metrics->crds_tx_push[ peer->txbuild->crds[ i ].tag ]++;
215 0 : active_set->metrics->crds_tx_push_bytes[ peer->txbuild->crds[ i ].tag ] += peer->txbuild->crds[ i ].sz;
216 0 : }
217 :
218 0 : fd_gossip_txbuild_init( peer->txbuild, active_set->identity_pubkey, FD_GOSSIP_MESSAGE_PUSH );
219 0 : }
220 :
221 : void
222 : fd_active_set_remove_peer( fd_active_set_t * active_set,
223 0 : ulong ci_idx ) {
224 0 : for( ulong b=0UL; b<25UL; b++ ) {
225 0 : fd_active_set_entry_t * entry = active_set->entries[ b ];
226 :
227 0 : for( ulong i=0UL; i<entry->nodes_len; i++ ) {
228 0 : ulong peer_idx = (entry->nodes_idx+i) % 12UL;
229 0 : if( FD_UNLIKELY( active_set->peers[ b*12UL+peer_idx ].ci_idx==ci_idx ) ) {
230 0 : fd_active_set_peer_t * peer = &active_set->peers[ b*12UL+peer_idx ];
231 0 : if( FD_UNLIKELY( peer->txbuild->crds_len ) ) push_dlist_ele_remove( active_set->push_dlist, peer, active_set->peers );
232 :
233 0 : for( ulong j=i; j<entry->nodes_len-1UL; j++ ) {
234 0 : ulong from_idx = b*12UL+(entry->nodes_idx+j+1UL) % 12UL;
235 0 : ulong to_idx = b*12UL+(entry->nodes_idx+j) % 12UL;
236 0 : fd_bloom_t * to_bloom = active_set->peers[ to_idx ].bloom;
237 0 : active_set->peers[ to_idx ] = active_set->peers[ from_idx ];
238 0 : active_set->peers[ from_idx ].bloom = to_bloom;
239 : /* If the moved element is in the push_dlist, fix up the
240 : dlist links so neighbors point to the new location.
241 : idx_replace reads prev/next from old_idx (from_idx, still
242 : intact) and patches neighbors + sentinel to reference
243 : to_abs instead. */
244 0 : if( FD_UNLIKELY( active_set->peers[ to_idx ].txbuild->crds_len ) ) push_dlist_idx_replace( active_set->push_dlist, to_idx, from_idx, active_set->peers );
245 0 : }
246 0 : entry->nodes_len--;
247 0 : if( FD_UNLIKELY( !entry->nodes_len ) ) entry->nodes_idx = 0UL;
248 0 : break;
249 0 : }
250 0 : }
251 0 : }
252 0 : }
253 :
254 : void
255 : fd_active_set_push( fd_active_set_t * active_set,
256 : uchar const * crds_val,
257 : ulong crds_sz,
258 : uchar const * origin_pubkey,
259 : ulong origin_stake,
260 : fd_stem_context_t * stem,
261 : long now,
262 0 : int flush_immediately ) {
263 0 : ulong stake_bucket = fd_active_set_stake_bucket( fd_ulong_min( active_set->identity_stake, origin_stake ) );
264 0 : fd_active_set_entry_t * entry = active_set->entries[ stake_bucket ];
265 :
266 0 : int originates_from_me = !memcmp( active_set->identity_pubkey, origin_pubkey, 32UL );
267 :
268 0 : for( ulong i=0UL; i<entry->nodes_len; i++ ) {
269 0 : fd_active_set_peer_t * peer = &active_set->peers[ stake_bucket*12UL+((entry->nodes_idx+i) % 12UL) ];
270 :
271 : /* If the value originated from us, we should always push it, even
272 : if theres a bloom filter hit, since bloom filters can have false
273 : positives and we don't want to accidentally not push our own
274 : values. */
275 0 : if( FD_UNLIKELY( fd_bloom_contains( peer->bloom, origin_pubkey, 32UL ) && !originates_from_me ) ) continue;
276 :
277 0 : if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( peer->txbuild, crds_sz ) ) ) push_flush( active_set, peer, stem, now );
278 0 : if( FD_UNLIKELY( !peer->txbuild->crds_len ) ) {
279 0 : peer->timestamp = now;
280 0 : push_dlist_ele_push_tail( active_set->push_dlist, peer, active_set->peers );
281 0 : }
282 0 : fd_gossip_txbuild_append( peer->txbuild, crds_sz, crds_val );
283 0 : if( FD_UNLIKELY( flush_immediately ) ) push_flush( active_set, peer, stem, now );
284 0 : }
285 0 : }
286 :
287 : static inline void
288 : rotate_active_set( fd_active_set_t * active_set,
289 : fd_stem_context_t * stem,
290 0 : long now ) {
291 0 : ulong num_bloom_filter_items = fd_ulong_max( fd_crds_peer_count( active_set->crds ), 512UL );
292 :
293 0 : ulong bucket = active_set->rotate_bucket;
294 0 : active_set->rotate_bucket = (active_set->rotate_bucket+1UL) % 25UL;
295 0 : fd_active_set_entry_t * entry = active_set->entries[ bucket ];
296 :
297 : /* Sample a new peer BEFORE evicting the oldest. This prevents the
298 : case where we evict a peer back into the sampler and then
299 : immediately re-sample it, creating a duplicate. */
300 :
301 0 : ulong added_ci_idx = fd_gossip_wsample_sample_remove_bucket( active_set->wsample, bucket );
302 0 : if( FD_UNLIKELY( added_ci_idx==ULONG_MAX ) ) return;
303 :
304 0 : ulong replace_idx;
305 0 : if( FD_LIKELY( entry->nodes_len==12UL ) ) {
306 0 : replace_idx = entry->nodes_idx;
307 0 : entry->nodes_idx = (entry->nodes_idx+1UL) % 12UL;
308 :
309 : /* Add the replaced peer back to the sampler. */
310 0 : ulong old_ci_idx = active_set->peers[ bucket*12UL+replace_idx ].ci_idx;
311 0 : fd_gossip_wsample_add_bucket( active_set->wsample, bucket, old_ci_idx );
312 0 : push_flush( active_set, &active_set->peers[ bucket*12UL+replace_idx ], stem, now );
313 0 : } else {
314 0 : replace_idx = (entry->nodes_idx+entry->nodes_len) % 12UL;
315 0 : }
316 :
317 0 : fd_active_set_peer_t * replace = &active_set->peers[ bucket*12UL+replace_idx ];
318 0 : replace->ci_idx = added_ci_idx;
319 0 : uchar const * new_pubkey = fd_crds_ci_pubkey( active_set->crds, added_ci_idx );
320 :
321 0 : fd_bloom_initialize( replace->bloom, num_bloom_filter_items );
322 0 : fd_bloom_insert( replace->bloom, new_pubkey, 32UL );
323 0 : entry->nodes_len = fd_ulong_min( entry->nodes_len+1UL, 12UL );
324 0 : fd_gossip_txbuild_init( replace->txbuild, active_set->identity_pubkey, FD_GOSSIP_MESSAGE_PUSH );
325 0 : }
326 :
327 :
328 : void
329 : fd_active_set_advance( fd_active_set_t * active_set,
330 : fd_stem_context_t * stem,
331 : long now,
332 0 : int * charge_busy ) {
333 0 : while( !push_dlist_is_empty( active_set->push_dlist, active_set->peers ) ) {
334 0 : fd_active_set_peer_t * head = push_dlist_ele_peek_head( active_set->push_dlist, active_set->peers );
335 0 : if( FD_LIKELY( head->timestamp>=now-1L*1000L*1000L ) ) break;
336 :
337 0 : push_flush( active_set, head, stem, now );
338 0 : if( charge_busy ) *charge_busy = 1;
339 0 : }
340 :
341 0 : if( FD_UNLIKELY( now>=active_set->next_rotate_nanos ) ) {
342 0 : rotate_active_set( active_set, stem, now );
343 0 : active_set->next_rotate_nanos = now+300L*1000L*1000L;
344 0 : if( charge_busy ) *charge_busy = 1;
345 0 : }
346 0 : }
|