Line data Source code
1 : #include "fd_circq.h" 2 : 3 : #include "../../util/log/fd_log.h" 4 : 5 : struct __attribute__((aligned(8UL))) fd_circq_message_private { 6 : ulong align; 7 : ulong footprint; 8 : 9 : /* Offset withn the circular buffer data region of where the next 10 : message starts, if there is one. This is not always the same as 11 : aligning up this message + footprint, because the next message may 12 : have wrapped around to the start of the buffer. */ 13 : ulong next; 14 : }; 15 : 16 : typedef struct fd_circq_message_private fd_circq_message_t; 17 : 18 : FD_FN_CONST ulong 19 0 : fd_circq_align( void ) { 20 0 : return FD_CIRCQ_ALIGN; 21 0 : } 22 : 23 : FD_FN_CONST ulong 24 0 : fd_circq_footprint( ulong sz ) { 25 0 : return sizeof( fd_circq_t ) + sz; 26 0 : } 27 : 28 : void * 29 : fd_circq_new( void * shmem, 30 0 : ulong sz ) { 31 0 : fd_circq_t * circq = (fd_circq_t *)shmem; 32 0 : circq->cnt = 0UL; 33 0 : circq->head = 0UL; 34 0 : circq->tail = 0UL; 35 0 : circq->size = sz; 36 0 : circq->cursor = ULONG_MAX; 37 0 : circq->cursor_seq = 0UL; 38 0 : circq->cursor_push_seq = 0UL; 39 : 40 0 : memset( &circq->metrics, 0, sizeof( circq->metrics ) ); 41 : 42 0 : return shmem; 43 0 : } 44 : 45 : fd_circq_t * 46 0 : fd_circq_join( void * shbuf ) { 47 0 : return (fd_circq_t *)shbuf; 48 0 : } 49 : 50 : void * 51 0 : fd_circq_leave( fd_circq_t * buf ) { 52 0 : return (void *)buf; 53 0 : } 54 : 55 : void * 56 0 : fd_circq_delete( void * shbuf ) { 57 0 : return shbuf; 58 0 : } 59 : 60 : static inline void FD_FN_UNUSED 61 0 : verify( fd_circq_t * circq ) { 62 0 : FD_TEST( circq->head<circq->size ); 63 0 : FD_TEST( circq->tail<circq->size ); 64 0 : FD_TEST( circq->tail!=circq->head || circq->cnt<=1 ); 65 0 : if( !circq->cnt ) { 66 0 : FD_TEST( circq->head==0UL ); 67 0 : FD_TEST( circq->tail==0UL ); 68 0 : } else if( circq->cnt==1UL ) { 69 0 : FD_TEST( circq->head==circq->tail ); 70 0 : } 71 0 : 72 0 : uchar * buf = (uchar *)(circq+1); 73 0 : 74 0 : ulong current = circq->head; 75 0 : int wrapped = 0; 76 0 : for( ulong i=0UL; i<circq->cnt; i++ ) { 77 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+current); 78 0 : ulong start = current; 79 0 : ulong end = fd_ulong_align_up( start+sizeof( fd_circq_message_t ), message->align ) + message->footprint; 80 0 : if( wrapped ) FD_TEST( end<=circq->head ); 81 0 : FD_TEST( start<end ); 82 0 : FD_TEST( end<=circq->size ); 83 0 : current = message->next; 84 0 : if( current<start ) wrapped = 1; 85 0 : } 86 0 : } 87 : 88 : static void 89 : evict( fd_circq_t * circq, 90 : ulong from, 91 0 : ulong to ) { 92 0 : uchar * buf = (uchar *)(circq+1); 93 : 94 0 : for(;;) { 95 0 : if( FD_UNLIKELY( !circq->cnt ) ) return; 96 : 97 0 : fd_circq_message_t * head = (fd_circq_message_t *)(buf+circq->head); 98 : 99 0 : ulong start = circq->head; 100 0 : ulong end = fd_ulong_align_up( start + sizeof( fd_circq_message_t ), head->align ) + head->footprint; 101 : 102 0 : if( FD_UNLIKELY( (start<to && end>from) ) ) { 103 0 : circq->cnt--; 104 0 : circq->metrics.drop_cnt++; 105 0 : if( FD_LIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL; 106 0 : else circq->head = head->next; 107 0 : } else { 108 0 : break; 109 0 : } 110 0 : } 111 0 : } 112 : 113 : uchar * 114 : fd_circq_push_back( fd_circq_t * circq, 115 : ulong align, 116 0 : ulong footprint ) { 117 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( align ) ) ) { 118 0 : FD_LOG_WARNING(( "align must be a power of 2" )); 119 0 : return NULL; 120 0 : } 121 0 : if( FD_UNLIKELY( align>FD_CIRCQ_ALIGN ) ) { 122 0 : FD_LOG_WARNING(( "align must be at most %lu", FD_CIRCQ_ALIGN )); 123 0 : return NULL; 124 0 : } 125 : 126 0 : ulong required = fd_ulong_align_up( sizeof( fd_circq_message_t ), align ) + footprint; 127 0 : if( FD_UNLIKELY( required>circq->size ) ) { 128 0 : FD_LOG_WARNING(( "tried to push message which was too large %lu>%lu", required, circq->size )); 129 0 : return NULL; 130 0 : } 131 : 132 0 : uchar * buf = (uchar *)(circq+1); 133 : 134 0 : ulong current = 0UL; 135 0 : fd_circq_message_t * message = NULL; 136 0 : if( FD_LIKELY( circq->cnt ) ) { 137 0 : message = (fd_circq_message_t *)(buf+circq->tail); 138 0 : current = fd_ulong_align_up( fd_ulong_align_up( circq->tail+sizeof( fd_circq_message_t ), message->align )+message->footprint, alignof( fd_circq_message_t ) ); 139 0 : } 140 : 141 0 : if( FD_UNLIKELY( current+required>circq->size ) ) { 142 0 : evict( circq, current, circq->size ); 143 0 : evict( circq, 0UL, required ); 144 : 145 0 : circq->tail = 0UL; 146 0 : if( FD_LIKELY( circq->cnt && message ) ) message->next = 0UL; 147 0 : } else { 148 0 : evict( circq, current, current+required ); 149 : 150 0 : circq->tail = current; 151 0 : if( FD_LIKELY( circq->cnt && message ) ) message->next = current; 152 0 : } 153 : 154 0 : circq->cnt++; 155 0 : fd_circq_message_t * next_message = (fd_circq_message_t *)(buf+circq->tail); 156 0 : next_message->align = align; 157 0 : next_message->footprint = footprint; 158 0 : circq->cursor_push_seq++; 159 0 : return (uchar *)(next_message+1); 160 0 : } 161 : 162 : void 163 : fd_circq_resize_back( fd_circq_t * circq, 164 0 : ulong new_footprint ) { 165 0 : FD_TEST( circq->cnt ); 166 : 167 0 : uchar * buf = (uchar *)(circq+1); 168 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->tail); 169 0 : FD_TEST( new_footprint<=message->footprint ); 170 : 171 0 : message->footprint = new_footprint; 172 0 : } 173 : 174 : uchar const * 175 : fd_circq_cursor_advance( fd_circq_t * circq, 176 0 : ulong * msg_sz ) { 177 : /* First call or after reset - start from head */ 178 0 : if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) { 179 0 : if( FD_UNLIKELY( !circq->cnt ) ) return NULL; 180 0 : circq->cursor = circq->head; 181 0 : circq->cursor_seq = circq->cursor_push_seq - circq->cnt; 182 0 : } else { 183 : /* Already iterating - move to next */ 184 0 : if( FD_UNLIKELY( circq->cursor_seq >= circq->cursor_push_seq ) ) return NULL; 185 : 186 0 : uchar * buf = (uchar *)(circq+1); 187 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->cursor); 188 0 : circq->cursor = message->next; 189 0 : } 190 : 191 0 : uchar * buf = (uchar *)(circq+1); 192 0 : fd_circq_message_t * current_msg = (fd_circq_message_t *)(buf+circq->cursor); 193 0 : circq->cursor_seq++; 194 0 : if( FD_LIKELY( msg_sz ) ) *msg_sz = current_msg->footprint; 195 0 : return (uchar *)(current_msg+1); 196 0 : } 197 : 198 : int 199 : fd_circq_pop_until( fd_circq_t * circq, 200 0 : ulong cursor ) { 201 0 : if( FD_UNLIKELY( cursor>=circq->cursor_push_seq ) ) return -1; 202 : 203 0 : ulong oldest_seq = circq->cursor_push_seq-circq->cnt; 204 0 : if( FD_UNLIKELY( cursor<oldest_seq ) ) return 0; 205 : 206 0 : ulong to_pop = fd_ulong_min( cursor-oldest_seq+1UL, circq->cnt ); 207 : 208 0 : uchar * buf = (uchar *)(circq+1); 209 0 : for( ulong i=0UL; i<to_pop; i++ ) { 210 0 : fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->head); 211 0 : circq->cnt--; 212 : 213 0 : if( FD_UNLIKELY( !circq->cnt ) ) { 214 0 : circq->head = circq->tail = 0UL; 215 0 : } else { 216 0 : circq->head = message->next; 217 0 : FD_TEST( circq->head<circq->size ); 218 0 : } 219 0 : } 220 : 221 0 : if( FD_UNLIKELY( !circq->cnt ) ) circq->cursor = ULONG_MAX; 222 0 : return 0; 223 0 : } 224 : 225 : void 226 0 : fd_circq_reset_cursor( fd_circq_t * circq ) { 227 0 : circq->cursor = ULONG_MAX; 228 0 : } 229 : 230 : ulong 231 0 : fd_circq_bytes_used( fd_circq_t const * circq ) { 232 0 : if( FD_UNLIKELY( !circq->cnt ) ) return 0UL; 233 : 234 0 : uchar const * buf = (uchar const *)(circq+1); 235 : 236 0 : fd_circq_message_t const * tail_msg = (fd_circq_message_t const *)(buf+circq->tail); 237 0 : ulong tail_end = fd_ulong_align_up( circq->tail + sizeof(fd_circq_message_t), tail_msg->align ) + tail_msg->footprint; 238 : 239 0 : if( FD_LIKELY( circq->tail>=circq->head ) ) return tail_end - circq->head; 240 0 : else return (circq->size - circq->head) + tail_end; 241 0 : }