Line data Source code
1 : #include "fd_gossip.h"
2 : #include "fd_bloom.h"
3 : #include "fd_gossip_message.h"
4 : #include "fd_gossip_txbuild.h"
5 : #include "fd_active_set.h"
6 : #include "fd_ping_tracker.h"
7 : #include "fd_prune_finder.h"
8 : #include "fd_gossip_wsample.h"
9 : #include "../../disco/keyguard/fd_keyguard.h"
10 : #include "../../ballet/sha256/fd_sha256.h"
11 : #include "../leaders/fd_leaders_base.h"
12 :
13 : FD_STATIC_ASSERT( FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT==FD_GOSSIP_MESSAGE_CNT,
14 : "FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT must match FD_GOSSIP_MESSAGE_CNT" );
15 :
16 : FD_STATIC_ASSERT( FD_METRICS_ENUM_CRDS_VALUE_CNT==FD_GOSSIP_VALUE_CNT,
17 : "FD_METRICS_ENUM_CRDS_VALUE_CNT must match FD_GOSSIP_VALUE_CNT" );
18 :
19 0 : #define BLOOM_FALSE_POSITIVE_RATE (0.1)
20 0 : #define BLOOM_NUM_KEYS (8.0)
21 :
22 : struct stake {
23 : fd_pubkey_t pubkey;
24 : ulong stake;
25 :
26 : struct {
27 : ulong prev;
28 : ulong next;
29 : } map;
30 :
31 : struct {
32 : ulong next;
33 : } pool;
34 : };
35 :
36 : typedef struct stake stake_t;
37 :
38 : /* NOTE: Since the staked count is known at the time we populate
39 : the map, we can treat the pool as an array instead. This means we
40 : can bypass the acquire/release model and quickly iterate through the
41 : pool when we repopulate the map on every fd_gossip_stakes_update
42 : iteration. */
43 : #define POOL_NAME stake_pool
44 0 : #define POOL_T stake_t
45 : #define POOL_IDX_T ulong
46 0 : #define POOL_NEXT pool.next
47 : #include "../../util/tmpl/fd_pool.c"
48 :
49 : #define MAP_NAME stake_map
50 0 : #define MAP_KEY pubkey
51 : #define MAP_ELE_T stake_t
52 : #define MAP_KEY_T fd_pubkey_t
53 0 : #define MAP_PREV map.prev
54 0 : #define MAP_NEXT map.next
55 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
56 0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
57 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
58 : #include "../../util/tmpl/fd_map_chain.c"
59 :
60 : struct fd_gossip_private {
61 : uchar identity_pubkey[ 32UL ];
62 : ulong identity_stake;
63 :
64 : fd_gossip_metrics_t metrics[1];
65 :
66 : fd_gossip_wsample_t * wsample;
67 : fd_crds_t * crds;
68 : fd_gossip_purged_t * purged;
69 : fd_active_set_t * active_set;
70 : fd_ping_tracker_t * ping_tracker;
71 : fd_prune_finder_t * prune_finder;
72 :
73 : fd_sha256_t sha256[1];
74 : fd_sha512_t sha512[1];
75 :
76 : ulong entrypoints_cnt;
77 : fd_ip4_port_t entrypoints[ 16UL ];
78 :
79 : fd_rng_t * rng;
80 :
81 : struct {
82 : ulong count;
83 : stake_t * pool;
84 : stake_map_t * map;
85 : } stake;
86 :
87 : struct {
88 : long next_pull_request;
89 : long next_active_set_refresh;
90 : long next_contact_info_refresh;
91 : long next_flush_push_state;
92 : } timers;
93 :
94 : /* Token-bucket rate limiter for outbound pull response data.
95 : Matches Agave's DataBudget: replenished every 100ms with
96 : num_staked*1024 bytes, capped at 5x that amount. Only
97 : pull responses are rate-limited; push messages are not. */
98 : struct {
99 : ulong remaining; /* bytes remaining in budget (signed) */
100 : long last_replenish_nanos; /* last replenish timestamp in nanos */
101 : } outbound_budget;
102 :
103 : /* Callbacks */
104 : fd_gossip_sign_fn sign_fn;
105 : void * sign_ctx;
106 :
107 : fd_gossip_send_fn send_fn;
108 : void * send_ctx;
109 :
110 : fd_ping_tracker_change_fn ping_tracker_change_fn;
111 : void * ping_tracker_change_fn_ctx;
112 :
113 : struct {
114 : uchar crds_val[ FD_GOSSIP_VALUE_MAX_SZ ];
115 : ulong crds_val_sz;
116 : fd_gossip_value_t ci[1];
117 : } my_contact_info;
118 :
119 : fd_gossip_out_ctx_t * gossip_net_out;
120 : };
121 :
122 : FD_FN_CONST ulong
123 0 : fd_gossip_align( void ) {
124 0 : return 128uL;
125 0 : }
126 :
127 : FD_FN_CONST ulong
128 : fd_gossip_footprint( ulong max_values,
129 0 : ulong entrypoints_len ) {
130 0 : ulong l;
131 0 : l = FD_LAYOUT_INIT;
132 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
133 0 : l = FD_LAYOUT_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values ) );
134 0 : l = FD_LAYOUT_APPEND( l, fd_gossip_wsample_align(),fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
135 0 : l = FD_LAYOUT_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) );
136 0 : l = FD_LAYOUT_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
137 0 : l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
138 0 : l = FD_LAYOUT_APPEND( l, fd_prune_finder_align(), fd_prune_finder_footprint() );
139 0 : l = FD_LAYOUT_APPEND( l, stake_pool_align(), stake_pool_footprint( MAX_STAKED_LEADERS ) );
140 0 : l = FD_LAYOUT_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
141 0 : l = FD_LAYOUT_FINI( l, fd_gossip_align() );
142 0 : return l;
143 0 : }
144 :
145 : static void
146 : ping_tracker_change( void * _ctx,
147 : uchar const * peer_pubkey,
148 : fd_ip4_port_t peer_address,
149 : long now,
150 0 : int change_type ) {
151 0 : fd_gossip_t * ctx = (fd_gossip_t *)_ctx;
152 :
153 0 : if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return;
154 :
155 0 : if( FD_LIKELY( change_type==FD_PING_TRACKER_CHANGE_TYPE_ACTIVE ) ) {
156 0 : fd_gossip_purged_drain_no_contact_info( ctx->purged, peer_pubkey );
157 0 : }
158 :
159 0 : ulong ci_idx = fd_crds_ci_idx( ctx->crds, peer_pubkey );
160 0 : if( FD_UNLIKELY( ci_idx!=ULONG_MAX ) ) {
161 0 : switch( change_type ) {
162 0 : case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE:
163 0 : fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 1 );
164 0 : break;
165 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE:
166 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED:
167 0 : fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 0 );
168 0 : fd_active_set_remove_peer( ctx->active_set, ci_idx );
169 0 : break;
170 0 : default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return;
171 0 : }
172 0 : }
173 :
174 0 : ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type );
175 0 : }
176 :
177 : static inline void
178 : refresh_contact_info( fd_gossip_t * gossip,
179 0 : long now ) {
180 0 : fd_memcpy( gossip->my_contact_info.ci->origin, gossip->identity_pubkey, 32UL );
181 0 : gossip->my_contact_info.ci->wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
182 0 : long sz = fd_gossip_value_serialize( gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, FD_GOSSIP_VALUE_MAX_SZ );
183 0 : FD_TEST( sz!=-1L );
184 0 : gossip->my_contact_info.crds_val_sz = (ulong)sz;
185 :
186 0 : gossip->sign_fn( gossip->sign_ctx,
187 0 : gossip->my_contact_info.crds_val+64UL,
188 0 : gossip->my_contact_info.crds_val_sz-64UL,
189 0 : FD_KEYGUARD_SIGN_TYPE_ED25519,
190 0 : gossip->my_contact_info.crds_val );
191 :
192 : /* We don't have stem_ctx here so we pre-empt in next
193 : fd_gossip_advance iteration instead. */
194 0 : gossip->timers.next_contact_info_refresh = now;
195 0 : }
196 :
197 : void *
198 : fd_gossip_new( void * shmem,
199 : fd_rng_t * rng,
200 : ulong max_values,
201 : ulong entrypoints_len,
202 : fd_ip4_port_t const * entrypoints,
203 : uchar const * identity_pubkey,
204 : fd_gossip_contact_info_t const * my_contact_info,
205 : long now,
206 : fd_gossip_send_fn send_fn,
207 : void * send_ctx,
208 : fd_gossip_sign_fn sign_fn,
209 : void * sign_ctx,
210 : fd_ping_tracker_change_fn ping_tracker_change_fn,
211 : void * ping_tracker_change_fn_ctx,
212 : fd_gossip_activity_update_fn activity_update_fn,
213 : void * activity_update_fn_ctx,
214 : fd_gossip_out_ctx_t * gossip_update_out,
215 0 : fd_gossip_out_ctx_t * gossip_net_out ) {
216 0 : if( FD_UNLIKELY( !shmem ) ) {
217 0 : FD_LOG_WARNING(( "NULL shmem" ));
218 0 : return NULL;
219 0 : }
220 :
221 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) {
222 0 : FD_LOG_WARNING(( "misaligned shmem" ));
223 0 : return NULL;
224 0 : }
225 :
226 0 : if( FD_UNLIKELY( entrypoints_len>16UL ) ) {
227 0 : FD_LOG_WARNING(( "entrypoints_cnt must be in [0, 16]" ));
228 0 : return NULL;
229 0 : }
230 :
231 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( max_values ) ) ) {
232 0 : FD_LOG_WARNING(( "max_values must be a power of 2" ));
233 0 : return NULL;
234 0 : }
235 :
236 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
237 0 : fd_gossip_t * gossip = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
238 0 : void * purged = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values ) );
239 0 : void * wsample = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_wsample_align(), fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
240 0 : void * crds = FD_SCRATCH_ALLOC_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) );
241 0 : void * active_set = FD_SCRATCH_ALLOC_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
242 0 : void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
243 0 : void * prune_finder = FD_SCRATCH_ALLOC_APPEND( l, fd_prune_finder_align(), fd_prune_finder_footprint() );
244 0 : void * stake_pool = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(), stake_pool_footprint( MAX_STAKED_LEADERS ) );
245 0 : void * stake_weights = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
246 :
247 0 : gossip->gossip_net_out = gossip_net_out;
248 :
249 0 : gossip->entrypoints_cnt = entrypoints_len;
250 0 : fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) );
251 :
252 0 : gossip->purged = fd_gossip_purged_join( fd_gossip_purged_new( purged, rng, max_values ) );
253 0 : FD_TEST( gossip->purged );
254 :
255 0 : gossip->wsample = fd_gossip_wsample_join( fd_gossip_wsample_new( wsample, rng, FD_CONTACT_INFO_TABLE_SIZE ) );
256 0 : FD_TEST( gossip->wsample );
257 :
258 0 : gossip->crds = fd_crds_join( fd_crds_new( crds, entrypoints, entrypoints_len, gossip->wsample, active_set, rng, max_values, gossip->purged, activity_update_fn, activity_update_fn_ctx, gossip_update_out ) );
259 0 : FD_TEST( gossip->crds );
260 :
261 0 : gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, gossip->wsample, gossip->crds, rng, identity_pubkey, 0UL, send_fn, send_ctx ) );
262 0 : FD_TEST( gossip->active_set );
263 :
264 0 : gossip->ping_tracker = fd_ping_tracker_join( fd_ping_tracker_new( ping_tracker, rng, gossip->entrypoints_cnt, gossip->entrypoints, ping_tracker_change, gossip ) );
265 0 : FD_TEST( gossip->ping_tracker );
266 :
267 0 : gossip->prune_finder = fd_prune_finder_join( fd_prune_finder_new( prune_finder ) );
268 0 : FD_TEST( gossip->prune_finder );
269 :
270 0 : gossip->stake.count = 0UL;
271 0 : gossip->stake.pool = stake_pool_join( stake_pool_new( stake_pool, MAX_STAKED_LEADERS ) );
272 0 : FD_TEST( gossip->stake.pool );
273 :
274 0 : gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt_est( MAX_STAKED_LEADERS ), fd_rng_ulong( rng ) ) );
275 0 : FD_TEST( gossip->stake.map );
276 :
277 0 : FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
278 0 : FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );
279 :
280 0 : gossip->rng = rng;
281 :
282 0 : gossip->timers.next_pull_request = 0L;
283 0 : gossip->timers.next_active_set_refresh = 0L;
284 0 : gossip->timers.next_contact_info_refresh = 0L;
285 0 : gossip->timers.next_flush_push_state = 0L;
286 :
287 0 : gossip->outbound_budget.remaining = 0UL;
288 0 : gossip->outbound_budget.last_replenish_nanos = now;
289 :
290 0 : gossip->send_fn = send_fn;
291 0 : gossip->send_ctx = send_ctx;
292 0 : gossip->sign_fn = sign_fn;
293 0 : gossip->sign_ctx = sign_ctx;
294 0 : gossip->ping_tracker_change_fn = ping_tracker_change_fn;
295 0 : gossip->ping_tracker_change_fn_ctx = ping_tracker_change_fn_ctx;
296 :
297 0 : gossip->my_contact_info.ci->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
298 0 : *gossip->my_contact_info.ci->contact_info = *my_contact_info;
299 0 : fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
300 0 : gossip->identity_stake = 0UL;
301 0 : refresh_contact_info( gossip, now );
302 :
303 0 : fd_memset( gossip->metrics, 0, sizeof(fd_gossip_metrics_t) );
304 :
305 0 : return gossip;
306 0 : }
307 :
308 : fd_gossip_t *
309 0 : fd_gossip_join( void * shgossip ) {
310 0 : if( FD_UNLIKELY( !shgossip ) ) {
311 0 : FD_LOG_WARNING(( "NULL shgossip" ));
312 0 : return NULL;
313 0 : }
314 :
315 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) {
316 0 : FD_LOG_WARNING(( "misaligned shgossip" ));
317 0 : return NULL;
318 0 : }
319 :
320 0 : return (fd_gossip_t *)shgossip;
321 0 : }
322 :
323 : fd_gossip_metrics_t const *
324 0 : fd_gossip_metrics( fd_gossip_t const * gossip ) {
325 0 : return gossip->metrics;
326 0 : }
327 :
328 : fd_crds_metrics_t const *
329 0 : fd_gossip_crds_metrics( fd_gossip_t const * gossip ) {
330 0 : return fd_crds_metrics( gossip->crds );
331 0 : }
332 :
333 : fd_ping_tracker_metrics_t const *
334 0 : fd_gossip_ping_tracker_metrics( fd_gossip_t const * gossip ) {
335 0 : return fd_ping_tracker_metrics( gossip->ping_tracker );
336 0 : }
337 :
338 : fd_gossip_purged_metrics_t const *
339 0 : fd_gossip_purged_metrics2( fd_gossip_t const * gossip ) {
340 0 : return fd_gossip_purged_metrics( gossip->purged );
341 0 : }
342 :
343 : fd_active_set_metrics_t const *
344 0 : fd_gossip_active_set_metrics2( fd_gossip_t const * gossip ) {
345 0 : return fd_active_set_metrics( gossip->active_set );
346 0 : }
347 :
348 : static fd_ip4_port_t
349 0 : random_entrypoint( fd_gossip_t const * gossip ) {
350 0 : ulong idx = fd_rng_ulong_roll( gossip->rng, gossip->entrypoints_cnt );
351 0 : return gossip->entrypoints[ idx ];
352 0 : }
353 :
354 : ulong
355 : get_stake( fd_gossip_t const * gossip,
356 0 : uchar const * pubkey ) {
357 0 : stake_t const * entry = stake_map_ele_query_const( gossip->stake.map, (fd_pubkey_t const *)pubkey, NULL, gossip->stake.pool );
358 0 : if( FD_UNLIKELY( !entry ) ) return 0UL;
359 0 : return entry->stake;
360 0 : }
361 :
362 : void
363 : fd_gossip_set_identity( fd_gossip_t * gossip,
364 : uchar const * identity_pubkey,
365 0 : long now ) {
366 0 : int identity_changed = memcmp( gossip->identity_pubkey, identity_pubkey, 32UL );
367 0 : if( FD_UNLIKELY( !identity_changed ) ) return;
368 :
369 0 : ulong new_ci_idx = fd_crds_ci_idx( gossip->crds, identity_pubkey );
370 :
371 : /* The new identity may already exist in CRDS as a normal peer (active
372 : in the wsample and potentially present in the active set). We
373 : must deactivate it before updating identity_pubkey to maintain the
374 : invariant that our own identity is never sampleable. */
375 0 : if( FD_UNLIKELY( new_ci_idx!=ULONG_MAX ) ) fd_active_set_remove_peer( gossip->active_set, new_ci_idx );
376 :
377 0 : fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
378 0 : gossip->identity_stake = get_stake( gossip, identity_pubkey );
379 0 : fd_gossip_wsample_set_identity( gossip->wsample, new_ci_idx );
380 0 : fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
381 0 : fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
382 0 : fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
383 0 : refresh_contact_info( gossip, now );
384 0 : }
385 :
386 : void
387 : fd_gossip_set_shred_version( fd_gossip_t * gossip,
388 : ushort shred_version,
389 0 : long now ) {
390 0 : gossip->my_contact_info.ci->contact_info->shred_version = shred_version;
391 0 : refresh_contact_info( gossip, now );
392 0 : }
393 :
394 : void
395 : fd_gossip_stakes_update( fd_gossip_t * gossip,
396 : fd_vote_stake_weight_t const * stake_weights,
397 0 : ulong stake_weights_cnt ) {
398 0 : stake_map_reset( gossip->stake.map );
399 0 : stake_pool_reset( gossip->stake.pool );
400 :
401 0 : for( ulong i=0UL; i<stake_weights_cnt; i++ ) {
402 0 : if( FD_UNLIKELY( fd_pubkey_eq( &stake_weights[i].id_key, &FD_DUMMY_ACCOUNT_PUBKEY ) ) ) continue;
403 0 : stake_t * entry;
404 0 : if( FD_UNLIKELY( (entry = stake_map_ele_query( gossip->stake.map, &stake_weights[i].id_key, NULL, gossip->stake.pool )) ) ) {
405 0 : entry->stake += stake_weights[ i ].stake;
406 0 : } else {
407 0 : entry = stake_pool_ele_acquire( gossip->stake.pool );
408 0 : fd_memcpy( entry->pubkey.uc, stake_weights[ i ].id_key.uc, 32UL );
409 0 : entry->stake = stake_weights[ i ].stake;
410 0 : stake_map_ele_insert( gossip->stake.map, entry, gossip->stake.pool );
411 0 : }
412 0 : }
413 :
414 0 : gossip->identity_stake = get_stake( gossip, gossip->identity_pubkey );
415 0 : fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
416 0 : fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
417 0 : fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
418 0 : gossip->stake.count = stake_pool_used( gossip->stake.pool );
419 0 : }
420 :
421 : /* Outbound data budget constants (matching Agave's DataBudget for gossip).
422 : Budget is replenished every BUDGET_REPLENISH_INTERVAL_NS with
423 : num_staked * BUDGET_BYTES_PER_INTERVAL bytes, capped at
424 : BUDGET_MAX_MULTIPLE * num_staked * BUDGET_BYTES_PER_INTERVAL. */
425 :
426 : #define BUDGET_REPLENISH_INTERVAL_NS (100L*1000L*1000L) /* 100 ms */
427 0 : #define BUDGET_BYTES_PER_INTERVAL (1024UL) /* per staked validator */
428 0 : #define BUDGET_MAX_MULTIPLE (5UL) /* max accumulation */
429 0 : #define BUDGET_MIN_STAKED (2UL) /* floor for num_staked */
430 :
431 : /* Lazily replenish the outbound pull-response budget if at least
432 : BUDGET_REPLENISH_INTERVAL_NS have elapsed since last replenish.
433 : Returns current remaining budget in bytes. */
434 :
435 : static inline ulong
436 : outbound_budget_replenish( fd_gossip_t * gossip,
437 0 : long now ) {
438 0 : long elapsed = now-gossip->outbound_budget.last_replenish_nanos;
439 :
440 0 : if( FD_LIKELY( elapsed>=BUDGET_REPLENISH_INTERVAL_NS ) ) {
441 0 : ulong num_staked = fd_ulong_max( gossip->stake.count, BUDGET_MIN_STAKED );
442 0 : ulong increment = num_staked * BUDGET_BYTES_PER_INTERVAL;
443 0 : ulong cap = BUDGET_MAX_MULTIPLE * increment;
444 0 : ulong remaining = gossip->outbound_budget.remaining + increment;
445 0 : gossip->outbound_budget.remaining = fd_ulong_min( remaining, cap );
446 0 : gossip->outbound_budget.last_replenish_nanos = now;
447 0 : }
448 0 : return gossip->outbound_budget.remaining;
449 0 : }
450 :
451 : static inline void
452 : txbuild_flush( fd_gossip_t * gossip,
453 : fd_gossip_txbuild_t * txbuild,
454 : fd_stem_context_t * stem,
455 : fd_ip4_port_t dest_addr,
456 0 : long now ) {
457 0 : if( FD_UNLIKELY( !txbuild->crds_len ) ) return;
458 :
459 : /* Debit the outbound data budget (gossip payload bytes only, not
460 : including IP/UDP headers — matching Agave's DataBudget which
461 : operates on serialized gossip-layer packet sizes). */
462 0 : gossip->outbound_budget.remaining -= fd_ulong_min( txbuild->bytes_len, gossip->outbound_budget.remaining );
463 :
464 0 : gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now );
465 :
466 0 : gossip->metrics->message_tx[ txbuild->tag ]++;
467 0 : gossip->metrics->message_tx_bytes[ txbuild->tag ] += txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
468 0 : for( ulong i=0UL; i<txbuild->crds_len; i++ ) {
469 0 : gossip->metrics->crds_tx_pull_response[ txbuild->crds[ i ].tag ]++;
470 0 : gossip->metrics->crds_tx_pull_response_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
471 0 : }
472 :
473 0 : fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag );
474 0 : }
475 :
476 : static void
477 : rx_pull_request( fd_gossip_t * gossip,
478 : fd_gossip_pull_request_t const * pr_view,
479 : fd_ip4_port_t peer_addr,
480 : fd_stem_context_t * stem,
481 0 : long now ) {
482 : /* Replenish and check outbound data budget. If the budget is
483 : exhausted, skip generating pull responses entirely. */
484 0 : if( FD_UNLIKELY( !outbound_budget_replenish( gossip, now ) ) ) return;
485 :
486 : /* When responding to a pull request, we skip CRDS entries whose
487 : wallclock is newer than the caller's wallclock + a random jitter.
488 : The jitter is drawn uniformly from [0, TIMEOUT/4) ms, matching
489 : Agave's behavior (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS = 15000ms).
490 : This prevents all responders from consistently excluding the same
491 : set of very-recent CRDS values. */
492 0 : #define FD_GOSSIP_PULL_JITTER_BOUND_MS (15000UL/4UL)
493 :
494 : /* Generate a random jitter in [0, 3750) ms, added to the caller's
495 : wallclock. CRDS entries newer than this adjusted threshold are
496 : excluded from the response. The jitter prevents all responders
497 : from consistently excluding the same near-boundary entries,
498 : improving cluster-wide convergence of recent values. */
499 0 : ulong caller_wallclock_ms = pr_view->contact_info->wallclock;
500 0 : ulong jitter_ms = fd_rng_ulong_roll( gossip->rng, FD_GOSSIP_PULL_JITTER_BOUND_MS );
501 0 : ulong adjusted_wallclock_ms = caller_wallclock_ms + jitter_ms;
502 :
503 0 : ulong keys[ sizeof(pr_view->crds_filter->filter->keys)/sizeof(ulong) ];
504 0 : ulong bits[ sizeof(pr_view->crds_filter->filter->bits)/sizeof(ulong) ];
505 0 : fd_memcpy( keys, pr_view->crds_filter->filter->keys, sizeof(pr_view->crds_filter->filter->keys) );
506 0 : fd_memcpy( bits, pr_view->crds_filter->filter->bits, sizeof(pr_view->crds_filter->filter->bits) );
507 :
508 0 : fd_bloom_t filter[1];
509 0 : filter->keys_len = pr_view->crds_filter->filter->keys_len;
510 0 : filter->keys = keys;
511 :
512 0 : filter->bits_len = pr_view->crds_filter->filter->bits_len;
513 0 : filter->bits = bits;
514 :
515 0 : fd_gossip_txbuild_t pull_resp[1];
516 0 : fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
517 :
518 0 : uchar iter_mem[ 16UL ];
519 :
520 0 : for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->crds_filter->mask, pr_view->crds_filter->mask_bits, iter_mem );
521 0 : !fd_crds_mask_iter_done( it, gossip->crds );
522 0 : it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
523 0 : fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );
524 :
525 : /* Skip CRDS entries whose originator wallclock is newer than the
526 : caller's wallclock + jitter. The caller hasn't had time to
527 : observe these values yet, so including them would be wasteful. */
528 0 : if( FD_UNLIKELY( fd_crds_entry_wallclock( candidate )>adjusted_wallclock_ms ) ) continue;
529 :
530 0 : if( FD_UNLIKELY( fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;
531 :
532 0 : uchar const * crds_val;
533 0 : ulong crds_size;
534 0 : fd_crds_entry_value( candidate, &crds_val, &crds_size );
535 0 : if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
536 0 : fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
537 0 : if( FD_UNLIKELY( !gossip->outbound_budget.remaining ) ) break;
538 0 : }
539 :
540 0 : txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
541 0 : }
542 :
543 : static void
544 : rx_values( fd_gossip_t * gossip,
545 : ulong values_len,
546 : fd_gossip_value_t const * values,
547 : uchar const * payload,
548 : uchar const * failed,
549 : fd_stem_context_t * stem,
550 : long now,
551 0 : long results[ static 17UL ] ) {
552 0 : for( ulong i=0UL; i<values_len; i++ ) {
553 0 : fd_gossip_value_t const * value = &values[ i ];
554 :
555 0 : if( FD_UNLIKELY( failed[ i ] ) ) {
556 0 : uchar candidate_hash[ 32UL ];
557 0 : fd_sha256_hash( payload+value->offset, value->length, candidate_hash );
558 0 : if( FD_LIKELY( failed[ i ]==FD_GOSSIP_FAILED_NO_CONTACT_INFO ) ) fd_gossip_purged_insert_no_contact_info( gossip->purged, value->origin, candidate_hash, now );
559 0 : else fd_gossip_purged_insert_failed_insert( gossip->purged, candidate_hash, now );
560 0 : continue;
561 0 : }
562 :
563 0 : ulong origin_stake = get_stake( gossip, value->origin );
564 0 : int origin_ping_tracker_active = fd_ping_tracker_active( gossip->ping_tracker, value->origin );
565 0 : int is_me = !memcmp( value->origin, gossip->identity_pubkey, 32UL );
566 :
567 0 : results[ i ] = fd_crds_insert( gossip->crds, value, payload+value->offset, value->length, origin_stake, origin_ping_tracker_active, is_me, now, stem );
568 0 : if( FD_UNLIKELY( results[ i ] ) ) continue;
569 :
570 0 : if( FD_UNLIKELY( value->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
571 0 : fd_ip4_port_t origin_addr = {
572 0 : .addr = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
573 0 : .port = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
574 0 : };
575 0 : if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, value->origin, origin_stake, origin_addr, now );
576 :
577 : /* We just learned this peer's contact info. Drain any
578 : no_contact_info hashes associated with this origin from the
579 : purged set so peers re-send those CRDS values. */
580 0 : if( FD_LIKELY( fd_ping_tracker_active( gossip->ping_tracker, value->origin ) ) ) fd_gossip_purged_drain_no_contact_info( gossip->purged, value->origin );
581 0 : }
582 :
583 0 : fd_active_set_push( gossip->active_set, payload+value->offset, value->length, value->origin, origin_stake, stem, now, 0 );
584 0 : }
585 0 : }
586 :
587 : static void
588 : rx_pull_response( fd_gossip_t * gossip,
589 : fd_gossip_pull_response_t const * pull_response,
590 : uchar const * payload,
591 : uchar const * failed,
592 : fd_stem_context_t * stem,
593 0 : long now ) {
594 0 : long results[ 17UL ];
595 0 : rx_values( gossip, pull_response->values_len, pull_response->values, payload, failed, stem, now, results );
596 0 : for( ulong i=0UL; i<pull_response->values_len; i++ ) {
597 0 : if( FD_UNLIKELY( failed[ i ] ) ) continue;
598 0 : if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PULL_RESPONSE_IDX ]++;
599 0 : else if( results[ i ]<0L ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_STALE_IDX ]++;
600 0 : else gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_DUPLICATE_IDX ]++;
601 0 : }
602 0 : }
603 :
604 : /* tx_prune constructs, signs, and sends a prune message telling
605 : `relayer` to stop pushing CRDS values originating from `origin`.
606 :
607 : On-wire layout (bincode):
608 : Protocol tag 4 (FD_GOSSIP_MESSAGE_PRUNE = 3)
609 : sender pubkey 32 (= identity_pubkey, outer PruneMessage field)
610 : PruneData.pubkey 32 (= identity_pubkey)
611 : prunes_len 8
612 : prunes[1] 32
613 : signature 64
614 : destination 32
615 : wallclock 8
616 :
617 : The signable data (input to Ed25519 sign) is the PruneData fields
618 : excluding signature:
619 : prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8]
620 : This must match fd_keyguard_payload_matches_prune_data (106 + 32 bytes). */
621 :
622 : static void
623 : tx_prune( fd_gossip_t * gossip,
624 : uchar const * relayer,
625 : uchar const * origin,
626 : fd_stem_context_t * stem,
627 0 : long now ) {
628 0 : ulong ci_idx = fd_crds_ci_idx( gossip->crds, relayer );
629 0 : if( FD_UNLIKELY( ci_idx==ULONG_MAX ) ) return;
630 :
631 0 : fd_gossip_contact_info_t const * ci = fd_crds_ci( gossip->crds, ci_idx );
632 0 : fd_ip4_port_t dest_addr = {
633 0 : .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
634 0 : .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
635 0 : };
636 0 : if( FD_UNLIKELY( !dest_addr.addr || !dest_addr.port ) ) return;
637 :
638 0 : ulong wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
639 :
640 : /* Build the signable payload:
641 : prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8] */
642 0 : uchar signable[ 26UL + 32UL + 8UL + 32UL + 32UL + 8UL ];
643 0 : uchar * p = signable;
644 0 : FD_STORE( ulong, p, 18UL ); p += 8UL;
645 0 : fd_memcpy( p, "\xffSOLANA_PRUNE_DATA", 18UL ); p += 18UL;
646 0 : fd_memcpy( p, gossip->identity_pubkey, 32UL ); p += 32UL;
647 0 : FD_STORE( ulong, p, 1UL ); p += 8UL;
648 0 : fd_memcpy( p, origin, 32UL ); p += 32UL;
649 0 : fd_memcpy( p, relayer, 32UL ); p += 32UL;
650 0 : FD_STORE( ulong, p, wallclock ); p += 8UL;
651 :
652 0 : uchar signature[ 64UL ];
653 0 : gossip->sign_fn( gossip->sign_ctx, signable, sizeof(signable), FD_KEYGUARD_SIGN_TYPE_ED25519, signature );
654 :
655 : /* Build the on-wire packet:
656 : tag(4) + sender(32) + pubkey(32) + prunes_len(8) + prunes[32]
657 : + signature(64) + destination(32) + wallclock(8) */
658 0 : uchar pkt[ 4UL + 32UL + 32UL + 8UL + 32UL + 64UL + 32UL + 8UL ];
659 0 : uchar * q = pkt;
660 0 : FD_STORE( uint, q, FD_GOSSIP_MESSAGE_PRUNE ); q += 4UL;
661 0 : fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL; /* sender */
662 0 : fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL; /* PruneData.pubkey */
663 0 : FD_STORE( ulong, q, 1UL ); q += 8UL;
664 0 : fd_memcpy( q, origin, 32UL ); q += 32UL;
665 0 : fd_memcpy( q, signature, 64UL ); q += 64UL;
666 0 : fd_memcpy( q, relayer, 32UL ); q += 32UL;
667 0 : FD_STORE( ulong, q, wallclock ); q += 8UL;
668 :
669 0 : gossip->send_fn( gossip->send_ctx, stem, pkt, sizeof(pkt), &dest_addr, (ulong)now );
670 :
671 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PRUNE ]++;
672 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PRUNE ] += sizeof(pkt) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
673 0 : }
674 :
675 : static void
676 : tx_prunes( fd_gossip_t * gossip,
677 : fd_stem_context_t * stem,
678 0 : long now ) {
679 0 : uchar const * relayer;
680 0 : uchar const * origin;
681 0 : while( fd_prune_finder_pop_prune( gossip->prune_finder, &relayer, &origin ) ) {
682 0 : tx_prune( gossip, relayer, origin, stem, now );
683 0 : }
684 0 : }
685 :
686 : static void
687 : rx_push( fd_gossip_t * gossip,
688 : fd_gossip_push_t const * push,
689 : uchar const * payload,
690 : uchar const * failed,
691 : long now,
692 0 : fd_stem_context_t * stem ) {
693 0 : long results[ 17UL ];
694 0 : rx_values( gossip, push->values_len, push->values, payload, failed, stem, now, results );
695 :
696 0 : for( ulong i=0UL; i<push->values_len; i++ ) {
697 0 : if( FD_UNLIKELY( failed[ i ] ) ) continue;
698 0 : if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PUSH_IDX ]++;
699 0 : else if( results[ i ]<0L ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_STALE_IDX ]++;
700 0 : else gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_DUPLICATE_IDX ]++;
701 :
702 0 : ulong num_dups;
703 0 : if( FD_LIKELY( !results[ i ] ) ) num_dups = 0UL;
704 0 : else if( FD_UNLIKELY( results[ i ]<0L ) ) num_dups = ULONG_MAX; /* stale => never timely */
705 0 : else num_dups = (ulong)results[ i ];
706 :
707 0 : ulong origin_stake = get_stake( gossip, push->values[ i ].origin );
708 0 : fd_prune_finder_record( gossip->prune_finder, push->values[ i ].origin, origin_stake, push->from, get_stake( gossip, push->from ), num_dups );
709 0 : }
710 :
711 0 : tx_prunes( gossip, stem, now );
712 0 : }
713 :
714 : static void
715 : rx_prune( fd_gossip_t * gossip,
716 0 : fd_gossip_prune_t const * prune ) {
717 0 : for( ulong i=0UL; i<prune->prunes_len; i++ ) {
718 0 : fd_active_set_prune( gossip->active_set,
719 0 : prune->pubkey,
720 0 : prune->prunes[ i ],
721 0 : get_stake( gossip, prune->prunes[ i ] ) );
722 0 : }
723 0 : }
724 :
725 :
726 : static void
727 : rx_ping( fd_gossip_t * gossip,
728 : fd_gossip_ping_t const * ping,
729 : fd_ip4_port_t peer_address,
730 : fd_stem_context_t * stem,
731 0 : long now ) {
732 0 : uchar out_payload[ sizeof(fd_gossip_pong_t)+4UL];
733 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PONG );
734 :
735 0 : fd_gossip_pong_t * out_pong = (fd_gossip_pong_t *)(out_payload + 4UL);
736 0 : fd_memcpy( out_pong->from, gossip->identity_pubkey, 32UL );
737 :
738 : /* fd_keyguard checks payloads for certain patterns before performing the
739 : sign. Pattern-matching can't be done on hashed data, so we need
740 : to supply the pre-hashed image to the sign fn (fd_keyguard will hash when
741 : supplied with FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519) while also hashing
742 : the image ourselves onto pong->ping_hash */
743 :
744 0 : uchar pre_image[ 48UL ];
745 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
746 0 : fd_memcpy( pre_image+16UL, ping->token, 32UL );
747 :
748 0 : fd_sha256_hash( pre_image, 48UL, out_pong->hash );
749 :
750 0 : gossip->sign_fn( gossip->sign_ctx, pre_image, 48UL, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519, out_pong->signature );
751 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), &peer_address, (ulong)now );
752 :
753 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PONG ]++;
754 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PONG ] += sizeof(out_payload)+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
755 0 : }
756 :
757 : static void
758 : rx_pong( fd_gossip_t * gossip,
759 : fd_gossip_pong_t const * pong,
760 : fd_ip4_port_t peer_address,
761 0 : long now ) {
762 0 : ulong stake = get_stake( gossip, pong->from );
763 0 : fd_ping_tracker_register( gossip->ping_tracker, pong->from, stake, peer_address, pong->hash, now );
764 0 : }
765 :
766 : void
767 : fd_gossip_rx( fd_gossip_t * gossip,
768 : fd_ip4_port_t peer,
769 : uchar const * data,
770 : ulong data_sz,
771 : long now,
772 0 : fd_stem_context_t * stem ) {
773 : /* TODO: Implement traffic shaper / bandwidth limiter */
774 0 : FD_TEST( data_sz>=sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS );
775 0 : fd_gossip_message_t const * message = (fd_gossip_message_t const *)data;
776 0 : uchar const * failed = data+sizeof(fd_gossip_message_t);
777 0 : uchar const * payload = data+sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS;
778 :
779 0 : switch( message->tag ) {
780 0 : case FD_GOSSIP_MESSAGE_PULL_REQUEST: rx_pull_request( gossip, message->pull_request, peer, stem, now ); break;
781 0 : case FD_GOSSIP_MESSAGE_PULL_RESPONSE: rx_pull_response( gossip, message->pull_response, payload, failed, stem, now ); break;
782 0 : case FD_GOSSIP_MESSAGE_PUSH: rx_push( gossip, message->push, payload, failed, now, stem ); break;
783 0 : case FD_GOSSIP_MESSAGE_PRUNE: rx_prune( gossip, message->prune ); break;
784 0 : case FD_GOSSIP_MESSAGE_PING: rx_ping( gossip, message->ping, peer, stem, now ); break;
785 0 : case FD_GOSSIP_MESSAGE_PONG: rx_pong( gossip, message->pong, peer, now ); break;
786 0 : default:
787 0 : FD_LOG_CRIT(( "Unknown gossip message type %u", message->tag ));
788 0 : break;
789 0 : }
790 0 : }
791 :
792 : static int
793 : fd_gossip_push( fd_gossip_t * gossip,
794 : fd_gossip_value_t const * value,
795 : fd_stem_context_t * stem,
796 0 : long now ) {
797 0 : uchar serialized[ FD_GOSSIP_VALUE_MAX_SZ ];
798 0 : long serialized_sz = fd_gossip_value_serialize( value, serialized, sizeof(serialized) );
799 0 : FD_TEST( serialized_sz!=-1L );
800 0 : gossip->sign_fn( gossip->sign_ctx, serialized+64UL, (ulong)serialized_sz-64UL, FD_KEYGUARD_SIGN_TYPE_ED25519, serialized );
801 :
802 0 : int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
803 0 : if( FD_UNLIKELY( fd_crds_insert( gossip->crds, value, serialized, (ulong)serialized_sz, gossip->identity_stake, origin_active, 1, now, stem ) ) ) return -1;
804 :
805 0 : fd_active_set_push( gossip->active_set, serialized, (ulong)serialized_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
806 0 : return 0;
807 0 : }
808 :
809 : int
810 : fd_gossip_push_vote( fd_gossip_t * gossip,
811 : uchar const * txn,
812 : ulong txn_sz,
813 : fd_stem_context_t * stem,
814 0 : long now ) {
815 0 : fd_gossip_value_t value = {
816 0 : .tag = FD_GOSSIP_VALUE_VOTE,
817 0 : .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
818 0 : .vote = {{
819 0 : .index = 0UL, /* TODO */
820 0 : .transaction_len = txn_sz,
821 0 : }}
822 0 : };
823 0 : fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
824 0 : FD_TEST( txn_sz<=sizeof(value.vote->transaction) );
825 0 : fd_memcpy( value.vote->transaction, txn, txn_sz );
826 :
827 0 : return fd_gossip_push( gossip, &value, stem, now );
828 0 : }
829 :
830 : int
831 : fd_gossip_push_duplicate_shred( fd_gossip_t * gossip,
832 : fd_gossip_duplicate_shred_t const * duplicate_shred,
833 : fd_stem_context_t * stem,
834 0 : long now ) {
835 0 : fd_gossip_value_t value = {
836 0 : .tag = FD_GOSSIP_VALUE_DUPLICATE_SHRED,
837 0 : .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
838 0 : };
839 0 : fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
840 0 : *value.duplicate_shred = *duplicate_shred;
841 :
842 0 : return fd_gossip_push( gossip, &value, stem, now );
843 0 : }
844 :
845 : static void
846 : tx_ping( fd_gossip_t * gossip,
847 : fd_stem_context_t * stem,
848 : long now,
849 0 : int * charge_busy ) {
850 0 : uchar out_payload[ sizeof(fd_gossip_ping_t) + 4UL ];
851 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PING );
852 :
853 0 : fd_gossip_ping_t * out_ping = (fd_gossip_ping_t *)( out_payload+4UL );
854 0 : fd_memcpy( out_ping->from, gossip->identity_pubkey, 32UL );
855 :
856 0 : uchar const * peer_pubkey;
857 0 : uchar const * ping_token;
858 0 : fd_ip4_port_t const * peer_address;
859 0 : while( fd_ping_tracker_pop_request( gossip->ping_tracker,
860 0 : now,
861 0 : &peer_pubkey,
862 0 : &peer_address,
863 0 : &ping_token ) ) {
864 0 : fd_memcpy( out_ping->token, ping_token, 32UL );
865 :
866 0 : gossip->sign_fn( gossip->sign_ctx, out_ping->token, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519, out_ping->signature );
867 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), peer_address, (ulong)now );
868 :
869 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PING ]++;
870 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PING ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
871 0 : if( charge_busy ) *charge_busy = 1;
872 0 : }
873 0 : }
874 :
875 : /* Construct and send a pull request to a random peer. The pull
876 : request contains a bloom filter over our known CRDS hashes so that
877 : the peer can respond with values we are missing.
878 :
879 : NOTE: Divergence from Agave:
880 : - Agave builds up to 2^mask_bits filters per pull period
881 : (sampling up to 1024), each covering a distinct partition of
882 : the hash space. We build and send exactly one filter per
883 : pull period, covering 1/2^mask_bits of the space.
884 :
885 : Maximum bloom filter bits in a PullRequest packet:
886 :
887 : PACKET_DATA_SIZE = 1232 (= 1280 - 40 - 8)
888 :
889 : Bytes consumed by non-bloom fields:
890 : discriminant(4) + keys_len(8) + keys(8*num_keys) +
891 : has_bits(1) + bloom_vec_len(8) + bloom_bits_count(8) +
892 : bloom_num_bits_set(8) + mask(8) + mask_bits(4)
893 : + contact_info_crds_val(crds_val_sz)
894 : = 49 + 8*num_keys + crds_val_sz
895 :
896 : The bitvec is serialized as u64 words, so the bitvec storage is
897 : ceil(num_bits/64)*8 bytes. The remaining packet bytes must
898 : accommodate this.
899 :
900 : Agave determines the max_bytes parameter (input to Bloom::random)
901 : via an empirical cache (get_max_bloom_filter_bytes). max_bytes*8
902 : is passed as the max_bits cap to Bloom::random, but actual
903 : num_bits is only ~83% of max_bits (the E/D ratio for p=0.1).
904 : We replicate this with a closed-form inversion: the largest
905 : max_bytes where ceil(num_bits/64)*8 fits in remaining space is
906 : max_bytes = floor(D * floor(64*W/E) / 8), where W is the max
907 : number of u64 words, E and D are the bloom filter constants.
908 :
909 : num_keys depends on the bloom sizing, which depends on the
910 : overhead, which depends on num_keys. However there is a closed
911 : form: compute num_keys from the pessimistic KEYS=8 overhead, then
912 : recompute the tight overhead with the true num_keys. This always
913 : converges in one step because the optimal key count is
914 : D*ln(2) ≈ 3.32 (where D = ln(p)/ln(1/2^ln2)), far from any
915 : rounding boundary. For p=0.1 and KEYS=8, num_keys is always 3.
916 :
917 : NB: The has_bits(1) + bloom_vec_len(8) are only written when
918 : num_bits>=1. fd_bloom_num_bits clamps to [1, max_bits], so
919 : num_bits>=1 always holds and this layout is correct. */
920 :
921 : static void
922 : tx_pull_request( fd_gossip_t * gossip,
923 : fd_stem_context_t * stem,
924 0 : long now ) {
925 0 : ulong total_crds_vals = fd_crds_len( gossip->crds ) + fd_gossip_purged_len( gossip->purged );
926 0 : ulong num_items = fd_ulong_max( 65536UL, total_crds_vals );
927 0 : ulong crds_val_sz = gossip->my_contact_info.crds_val_sz;
928 :
929 : /* Step 1: Compute num_keys from the pessimistic KEYS=8 overhead
930 : (same initial estimate Agave uses in CrdsFilterSet::new). */
931 0 : ulong pessimistic_overhead = 49UL + 8UL*(ulong)BLOOM_NUM_KEYS + crds_val_sz;
932 0 : FD_TEST( pessimistic_overhead<FD_GOSSIP_MTU );
933 0 : double pessimistic_max_bits = (double)( 8UL*( FD_GOSSIP_MTU - pessimistic_overhead ) );
934 0 : double pessimistic_items = fd_bloom_max_items( pessimistic_max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
935 0 : FD_TEST( pessimistic_items>0.0 );
936 0 : ulong pessimistic_num_bits = fd_bloom_num_bits( pessimistic_items, BLOOM_FALSE_POSITIVE_RATE, pessimistic_max_bits );
937 0 : ulong num_keys = fd_bloom_num_keys( (double)pessimistic_num_bits, pessimistic_items );
938 :
939 : /* Step 2: Recompute with the tight overhead using the true num_keys.
940 : Find the largest max_bytes parameter (matching Agave's
941 : get_max_bloom_filter_bytes cache) such that the resulting bitvec
942 : fits in the remaining packet space.
943 :
944 : Given:
945 : max_items = ceil(max_bits / D) where D = -K / ln(1-exp(ln(p)/K))
946 : num_bits = ceil(max_items * E) where E = ln(p) / ln(1/2^ln2)
947 :
948 : We need ceil(num_bits/64)*8 <= remaining, i.e. num_bits <= 64*W
949 : where W = floor(remaining/8). Working backwards:
950 : max_items <= I where I = floor(64*W / E)
951 : max_bytes <= D*I / 8
952 :
953 : So max_bytes = floor(D * floor(64*W/E) / 8). */
954 0 : ulong overhead = 49UL + 8UL*num_keys + crds_val_sz;
955 0 : FD_TEST( overhead<FD_GOSSIP_MTU );
956 0 : ulong remaining = FD_GOSSIP_MTU - overhead;
957 0 : ulong max_words = remaining / 8UL; /* max u64 words for bitvec */
958 :
959 0 : double E = log( BLOOM_FALSE_POSITIVE_RATE ) / log( 1.0 / pow( 2.0, log( 2.0 ) ) );
960 0 : double D = -BLOOM_NUM_KEYS / log( 1.0 - exp( log( BLOOM_FALSE_POSITIVE_RATE ) / BLOOM_NUM_KEYS ) );
961 0 : ulong I = (ulong)floor( 64.0 * (double)max_words / E );
962 0 : ulong max_bytes = (ulong)floor( D * (double)I / 8.0 );
963 :
964 0 : double max_bits = (double)( max_bytes * 8UL );
965 0 : double max_items = fd_bloom_max_items( max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
966 0 : FD_TEST( max_items>0.0 );
967 0 : ulong num_bits = fd_bloom_num_bits( max_items, BLOOM_FALSE_POSITIVE_RATE, max_bits );
968 0 : FD_TEST( num_bits>=1UL );
969 0 : FD_TEST( (num_bits+63UL)/64UL<=max_words ); /* verify bitvec fits */
970 0 : FD_TEST( fd_bloom_num_keys( (double)num_bits, max_items )==num_keys ); /* verify convergence */
971 :
972 0 : double _mask_bits = ceil( log2( (double)num_items / max_items ) );
973 0 : uint mask_bits = _mask_bits >= 0.0 ? fd_uint_min( (uint)_mask_bits, 63U ) : 0U;
974 0 : ulong mask = fd_rng_ulong( gossip->rng ) | (~0UL>>(mask_bits));
975 :
976 0 : uchar payload[ FD_GOSSIP_MTU ] = {0};
977 :
978 0 : ulong * keys_ptr, * bits_ptr, * bits_set;
979 0 : long payload_sz = fd_gossip_pull_request_init( payload,
980 0 : FD_GOSSIP_MTU,
981 0 : num_keys,
982 0 : num_bits,
983 0 : mask,
984 0 : mask_bits,
985 0 : gossip->my_contact_info.crds_val,
986 0 : gossip->my_contact_info.crds_val_sz,
987 0 : &keys_ptr,
988 0 : &bits_ptr,
989 0 : &bits_set );
990 0 : FD_TEST( -1L!=payload_sz );
991 :
992 0 : fd_bloom_t filter[1];
993 0 : fd_bloom_init_inplace( keys_ptr, bits_ptr, num_keys, num_bits, 0, gossip->rng, BLOOM_FALSE_POSITIVE_RATE, filter );
994 :
995 0 : uchar iter_mem[ 16UL ];
996 0 : for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
997 0 : !fd_crds_mask_iter_done( it, gossip->crds );
998 0 : it = fd_crds_mask_iter_next( it, gossip->crds ) ) {
999 0 : fd_bloom_insert( filter, fd_crds_entry_hash( fd_crds_mask_iter_entry( it, gossip->crds ) ), 32UL );
1000 0 : }
1001 :
1002 0 : for( fd_gossip_purged_mask_iter_t * it = fd_gossip_purged_mask_iter_init( gossip->purged, mask, mask_bits, iter_mem );
1003 0 : !fd_gossip_purged_mask_iter_done( it, gossip->purged );
1004 0 : it = fd_gossip_purged_mask_iter_next( it, gossip->purged ) ){
1005 0 : fd_bloom_insert( filter, fd_gossip_purged_mask_iter_hash( it, gossip->purged ), 32UL );
1006 0 : }
1007 :
1008 0 : int num_bits_set = 0;
1009 0 : for( ulong i=0UL; i<(num_bits+63)/64UL; i++ ) num_bits_set += fd_ulong_popcnt( bits_ptr[ i ] );
1010 0 : *bits_set = (ulong)num_bits_set;
1011 :
1012 0 : ulong idx = fd_gossip_wsample_sample_pull_request( gossip->wsample );
1013 0 : fd_ip4_port_t peer_addr;
1014 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) {
1015 0 : if( FD_UNLIKELY( !gossip->entrypoints_cnt ) ) {
1016 : /* We are the bootstrapping node, and nobody else is present in
1017 : the cluster. Nowhere to send the pull request. */
1018 0 : return;
1019 0 : }
1020 0 : peer_addr = random_entrypoint( gossip );
1021 0 : } else {
1022 0 : fd_gossip_contact_info_t const * peer = fd_crds_ci( gossip->crds, idx );
1023 0 : peer_addr.addr = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0 : peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4;
1024 0 : peer_addr.port = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port;
1025 0 : }
1026 0 : gossip->send_fn( gossip->send_ctx, stem, payload, (ulong)payload_sz, &peer_addr, (ulong)now );
1027 :
1028 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PULL_REQUEST ]++;
1029 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PULL_REQUEST ] += (ulong)payload_sz + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
1030 0 : }
1031 :
1032 : void
1033 : fd_gossip_advance( fd_gossip_t * gossip,
1034 : long now,
1035 : fd_stem_context_t * stem,
1036 0 : int * charge_busy ) {
1037 0 : outbound_budget_replenish( gossip, now );
1038 :
1039 0 : fd_gossip_purged_expire( gossip->purged, now );
1040 0 : fd_active_set_advance( gossip->active_set, stem, now, charge_busy );
1041 0 : fd_crds_advance( gossip->crds, now, stem, charge_busy );
1042 :
1043 0 : tx_ping( gossip, stem, now, charge_busy );
1044 0 : if( FD_UNLIKELY( now>=gossip->timers.next_pull_request ) ) {
1045 0 : tx_pull_request( gossip, stem, now );
1046 0 : if( charge_busy ) *charge_busy = 1;
1047 : /* 1.6ms (625/s). Agave sends min(1024, ceil(2^mask_bits/8))
1048 : filters every 500ms. For a typical mainnet table (~65k items,
1049 : mask_bits≈7) that is ~16 filters/500ms = one every 31ms. We
1050 : send a single filter per round, so we fire ~20× more often to
1051 : compensate for sending one filter instead of many per period.
1052 :
1053 : We considered dynamically matching Agave's exact rate by
1054 : computing 500ms/filters_per_round from mask_bits each round,
1055 : but this caused slow table fill on startup (mask_bits starts
1056 : low -> long intervals -> few pulls -> slow CRDS population).
1057 : Adaptive boosting (counter-based, timestamp-based, and
1058 : threshold-based) all added complexity without clear benefit:
1059 : counter decay lost state between send and response arrival,
1060 : timestamp checks never disarmed because trickle inserts kept
1061 : refreshing the window, and threshold heuristics required
1062 : tuning constants that varied by cluster size.
1063 :
1064 : A fixed 1.6ms is simpler and robust: the cost of a redundant
1065 : pull request is negligible (a single 1232-byte packet whose
1066 : reply will be empty if we're already caught up), and it
1067 : guarantees fast table fill on startup without any adaptive
1068 : machinery. */
1069 0 : gossip->timers.next_pull_request = now+1600L*1000L;
1070 0 : }
1071 0 : if( FD_UNLIKELY( now>=gossip->timers.next_contact_info_refresh ) ) {
1072 : /* TODO: Frequency of this? More often if observing? */
1073 0 : refresh_contact_info( gossip, now );
1074 0 : int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
1075 0 : fd_crds_insert( gossip->crds, gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_stake, origin_active, 1, now, stem );
1076 0 : fd_active_set_push( gossip->active_set, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
1077 0 : gossip->timers.next_contact_info_refresh = now+15L*500L*1000L*1000L; /* TODO: Jitter */
1078 0 : if( charge_busy ) *charge_busy = 1;
1079 0 : }
1080 0 : }
1081 :
1082 : void
1083 : fd_gossip_ping_tracker_track( fd_gossip_t * gossip,
1084 : uchar const * peer_pubkey,
1085 : fd_ip4_port_t peer_address,
1086 0 : long now ) {
1087 0 : ulong origin_stake = get_stake( gossip, peer_pubkey );
1088 0 : fd_ping_tracker_track( gossip->ping_tracker, peer_pubkey, origin_stake, peer_address, now );
1089 0 : }
|