Line data Source code
1 : #ifndef HEADER_fd_src_disco_stem_fd_stem_h 2 : #define HEADER_fd_src_disco_stem_fd_stem_h 3 : 4 : #include "../fd_disco_base.h" 5 : 6 0 : #define FD_STEM_SCRATCH_ALIGN (128UL) 7 : 8 : struct fd_stem_context { 9 : fd_frag_meta_t ** mcaches; 10 : ulong * seqs; 11 : ulong * depths; 12 : 13 : ulong * cr_avail; 14 : ulong * min_cr_avail; 15 : ulong cr_decrement_amount; 16 : int * out_reliable; 17 : }; 18 : 19 : typedef struct fd_stem_context fd_stem_context_t; 20 : 21 : struct __attribute__((aligned(64))) fd_stem_tile_in { 22 : fd_frag_meta_t const * mcache; /* local join to this in's mcache */ 23 : uint depth; /* == fd_mcache_depth( mcache ), depth of this in's cache (const) */ 24 : uint idx; /* index of this in in the list of providers, [0, in_cnt) */ 25 : ulong seq; /* sequence number of next frag expected from the upstream producer, 26 : updated when frag from this in is published */ 27 : fd_frag_meta_t const * mline; /* == mcache + fd_mcache_line_idx( seq, depth ), location to poll next */ 28 : ulong * fseq; /* local join to the fseq used to return flow control credits to the in */ 29 : uint accum[6]; /* local diagnostic accumulators. These are drained during in housekeeping. */ 30 : /* Assumes FD_FSEQ_DIAG_{PUB_CNT,PUB_SZ,FILT_CNT,FILT_SZ,OVRNP_CNT,OVRNP_FRAG_CNT} are 0:5 */ 31 : }; 32 : 33 : typedef struct fd_stem_tile_in fd_stem_tile_in_t; 34 : 35 : static inline void 36 : fd_stem_publish( fd_stem_context_t * stem, 37 : ulong out_idx, 38 : ulong sig, 39 : ulong chunk, 40 : ulong sz, 41 : ulong ctl, 42 : ulong tsorig, 43 0 : ulong tspub ) { 44 0 : ulong * seqp = &stem->seqs[ out_idx ]; 45 0 : ulong seq = *seqp; 46 0 : fd_mcache_publish( stem->mcaches[ out_idx ], stem->depths[ out_idx ], seq, sig, chunk, sz, ctl, tsorig, tspub ); 47 0 : if( FD_LIKELY( stem->out_reliable[ out_idx ] ) ) { 48 0 : if( FD_UNLIKELY( stem->cr_avail[ out_idx ]<stem->cr_decrement_amount ) ) { /* Ensure producer BURST is set correctly */ 49 0 : FD_LOG_ERR(( "BURST underprovisioned out_idx=%lu cr_avail=%lu min_cr_avail=%lu cr_decrement_amount=%lu", out_idx, stem->cr_avail[ out_idx ], *stem->min_cr_avail, stem->cr_decrement_amount )); 50 0 : } 51 0 : stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount; 52 0 : *stem->min_cr_avail = fd_ulong_min( stem->cr_avail[ out_idx ], *stem->min_cr_avail ); 53 0 : } 54 0 : *seqp = fd_seq_inc( seq, 1UL ); 55 0 : } 56 : 57 : static inline ulong 58 : fd_stem_advance( fd_stem_context_t * stem, 59 0 : ulong out_idx ) { 60 0 : ulong * seqp = &stem->seqs[ out_idx ]; 61 0 : ulong seq = *seqp; 62 0 : if( FD_LIKELY( stem->out_reliable[ out_idx ] ) ) { 63 0 : stem->cr_avail[ out_idx ] -= stem->cr_decrement_amount; 64 0 : *stem->min_cr_avail = fd_ulong_min( stem->cr_avail[ out_idx ], *stem->min_cr_avail ); 65 0 : } 66 0 : *seqp = fd_seq_inc( seq, 1UL ); 67 0 : return seq; 68 0 : } 69 : 70 : #endif /* HEADER_fd_src_disco_stem_fd_stem_h */