Line data Source code
1 : #include "fd_tpu.h"
2 : #include "fd_tpu_reasm_private.h"
3 : #include "../../tango/dcache/fd_dcache.h"
4 : #include "../../tango/mcache/fd_mcache.h"
5 :
6 : FD_FN_CONST ulong
7 0 : fd_tpu_reasm_align( void ) {
8 0 : return alignof(fd_tpu_reasm_t);
9 0 : }
10 :
11 : FD_FN_CONST ulong
12 : fd_tpu_reasm_footprint( ulong depth,
13 0 : ulong burst ) {
14 :
15 0 : if( FD_UNLIKELY(
16 0 : ( fd_ulong_popcnt( depth )!=1 ) |
17 0 : ( depth>0x7fffffffUL ) |
18 0 : ( burst<2 ) |
19 0 : ( burst>0x7fffffffUL ) ) )
20 0 : return 0UL;
21 :
22 0 : ulong slot_cnt = depth+burst;
23 0 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
24 0 : return FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_INIT,
25 0 : fd_tpu_reasm_align(), sizeof(fd_tpu_reasm_t) ), /* hdr */
26 0 : alignof(uint), depth *sizeof(uint) ), /* pub_slots */
27 0 : alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) ), /* slots */
28 0 : fd_tpu_reasm_map_align(), fd_tpu_reasm_map_footprint( chain_cnt ) ), /* map */
29 0 : fd_tpu_reasm_align() );
30 :
31 0 : }
32 :
33 : void *
34 : fd_tpu_reasm_new( void * shmem,
35 : ulong depth,
36 : ulong burst,
37 : ulong orig,
38 0 : void * dcache ) {
39 :
40 0 : if( FD_UNLIKELY( !shmem ) ) return NULL;
41 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, FD_TPU_REASM_ALIGN ) ) ) return NULL;
42 0 : if( FD_UNLIKELY( !fd_tpu_reasm_footprint( depth, burst ) ) ) return NULL;
43 0 : if( FD_UNLIKELY( orig > FD_FRAG_META_ORIG_MAX ) ) return NULL;
44 :
45 0 : ulong req_data_sz = fd_tpu_reasm_req_data_sz( depth, burst );
46 0 : if( FD_UNLIKELY( fd_dcache_data_sz( dcache )<req_data_sz ) ) {
47 0 : FD_LOG_WARNING(( "dcache data_sz is too small (need %lu, have %lu)", req_data_sz, fd_dcache_data_sz( dcache ) ));
48 0 : return NULL;
49 0 : }
50 :
51 : /* Memory layout */
52 :
53 0 : ulong slot_cnt = depth+burst;
54 0 : if( FD_UNLIKELY( !slot_cnt ) ) return NULL;
55 0 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
56 :
57 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
58 0 : fd_tpu_reasm_t * reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_align(), sizeof(fd_tpu_reasm_t) );
59 0 : ulong * pub_slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), depth*sizeof(uint) );
60 0 : fd_tpu_reasm_slot_t * slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t) );
61 0 : void * map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_map_align(), fd_tpu_reasm_map_footprint( chain_cnt ) );
62 0 : FD_SCRATCH_ALLOC_FINI( l, fd_tpu_reasm_align() );
63 :
64 0 : fd_memset( reasm, 0, sizeof(fd_tpu_reasm_t) );
65 0 : fd_memset( slots, 0, burst*sizeof(fd_tpu_reasm_slot_t) );
66 :
67 0 : fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_join( fd_tpu_reasm_map_new( map_mem, chain_cnt, 0UL ) );
68 0 : if( FD_UNLIKELY( !map ) ) {
69 0 : FD_LOG_WARNING(( "fd_tpu_reasm_map_new failed" ));
70 0 : return NULL;
71 0 : }
72 :
73 : /* Initialize reasm object */
74 :
75 0 : reasm->slots_off = (ulong)( (uchar *)slots - (uchar *)reasm );
76 0 : reasm->pub_slots_off = (ulong)( (uchar *)pub_slots - (uchar *)reasm );
77 0 : reasm->map_off = (ulong)( (uchar *)map - (uchar *)reasm );
78 0 : reasm->dcache = dcache;
79 :
80 0 : reasm->depth = (uint)depth;
81 0 : reasm->burst = (uint)burst;
82 0 : reasm->head = (uint)slot_cnt-1U;
83 0 : reasm->tail = (uint)depth;
84 0 : reasm->slot_cnt = (uint)slot_cnt;
85 0 : reasm->orig = (ushort)orig;
86 :
87 : /* Initial slot distribution */
88 :
89 0 : fd_tpu_reasm_reset( reasm );
90 :
91 0 : FD_COMPILER_MFENCE();
92 0 : reasm->magic = FD_TPU_REASM_MAGIC;
93 0 : FD_COMPILER_MFENCE();
94 :
95 0 : return reasm;
96 0 : }
97 :
98 : void
99 0 : fd_tpu_reasm_reset( fd_tpu_reasm_t * reasm ) {
100 :
101 0 : uint depth = reasm->depth;
102 0 : uint burst = reasm->burst;
103 0 : uint node_cnt = depth+burst;
104 :
105 0 : fd_tpu_reasm_slot_t * slots = fd_tpu_reasm_slots_laddr( reasm );
106 0 : uint * pub_slots = fd_tpu_reasm_pub_slots_laddr( reasm );
107 0 : fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_laddr( reasm );
108 :
109 : /* The initial state moves the first 'depth' slots to the mcache (PUB)
110 : and leaves the rest as FREE. */
111 :
112 0 : for( uint j=0U; j<depth; j++ ) {
113 0 : fd_tpu_reasm_slot_t * slot = slots + j;
114 0 : slot->k.state = FD_TPU_REASM_STATE_PUB;
115 0 : slot->k.conn_uid = ULONG_MAX;
116 0 : slot->k.stream_id = 0xffffffffffff;
117 0 : slot->k.sz = 0;
118 0 : slot->chain_next = UINT_MAX;
119 0 : pub_slots[ j ] = j;
120 0 : }
121 0 : for( uint j=depth; j<node_cnt; j++ ) {
122 0 : fd_tpu_reasm_slot_t * slot = slots + j;
123 0 : slot->k.state = FD_TPU_REASM_STATE_FREE;
124 0 : slot->k.conn_uid = ULONG_MAX;
125 0 : slot->k.stream_id = 0xffffffffffff;
126 0 : slot->k.sz = 0;
127 0 : slot->lru_prev = fd_uint_if( j<node_cnt-1U, j+1U, UINT_MAX );
128 0 : slot->lru_next = fd_uint_if( j>depth, j-1U, UINT_MAX );
129 0 : slot->chain_next = UINT_MAX;
130 0 : }
131 :
132 : /* Clear the entire hash map */
133 :
134 0 : ulong chain_cnt = fd_tpu_reasm_map_chain_cnt( map );
135 0 : uint * chains = fd_tpu_reasm_map_private_chain( map );
136 0 : for( uint j=0U; j<chain_cnt; j++ ) {
137 0 : chains[ j ] = UINT_MAX;
138 0 : }
139 0 : }
140 :
141 : fd_tpu_reasm_t *
142 0 : fd_tpu_reasm_join( void * shreasm ) {
143 0 : fd_tpu_reasm_t * reasm = shreasm;
144 0 : if( FD_UNLIKELY( reasm->magic != FD_TPU_REASM_MAGIC ) ) {
145 0 : FD_LOG_WARNING(( "bad magic" ));
146 0 : return NULL;
147 0 : }
148 0 : return reasm;
149 0 : }
150 :
151 : void *
152 0 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm ) {
153 0 : return reasm;
154 0 : }
155 :
156 : void *
157 0 : fd_tpu_reasm_delete( void * shreasm ) {
158 0 : fd_tpu_reasm_t * reasm = shreasm;
159 0 : if( FD_UNLIKELY( !reasm ) ) return NULL;
160 0 : reasm->magic = 0UL;
161 0 : return shreasm;
162 0 : }
163 :
164 : fd_tpu_reasm_slot_t *
165 : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
166 : ulong conn_uid,
167 0 : ulong stream_id ) {
168 0 : return smap_query( reasm, conn_uid, stream_id );
169 0 : }
170 :
171 : fd_tpu_reasm_slot_t *
172 : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
173 : ulong conn_uid,
174 : ulong stream_id,
175 0 : long tsorig ) {
176 0 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
177 0 : smap_remove( reasm, slot );
178 0 : slot_begin( slot );
179 0 : slotq_push_head( reasm, slot );
180 0 : slot->k.conn_uid = conn_uid;
181 0 : slot->k.stream_id = stream_id & FD_TPU_REASM_SID_MASK;
182 0 : smap_insert( reasm, slot );
183 0 : slot->tsorig_comp = (uint)fd_frag_meta_ts_comp( tsorig );
184 0 : return slot;
185 0 : }
186 :
187 : int
188 : fd_tpu_reasm_frag( fd_tpu_reasm_t * reasm,
189 : fd_tpu_reasm_slot_t * slot,
190 : uchar const * data,
191 : ulong data_sz,
192 0 : ulong data_off ) {
193 :
194 0 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
195 0 : return FD_TPU_REASM_ERR_STATE;
196 :
197 0 : ulong slot_idx = slot_get_idx( reasm, slot );
198 0 : ulong mtu = FD_TPU_REASM_MTU;
199 0 : ulong sz0 = slot->k.sz;
200 :
201 0 : if( FD_UNLIKELY( data_off>sz0 ) ) {
202 0 : return FD_TPU_REASM_ERR_SKIP;
203 0 : }
204 :
205 0 : if( FD_UNLIKELY( data_off<sz0 ) ) {
206 : /* Fragment partially known ... should not happen */
207 0 : ulong skip = sz0 - data_off;
208 0 : if( skip>data_sz ) return FD_TPU_REASM_SUCCESS;
209 0 : data_off += skip;
210 0 : data_sz -= skip;
211 0 : data += skip;
212 0 : }
213 :
214 0 : ulong sz1 = sz0 + data_sz;
215 0 : if( FD_UNLIKELY( (sz1<sz0)|(sz1>mtu) ) ) {
216 0 : fd_tpu_reasm_cancel( reasm, slot );
217 0 : return FD_TPU_REASM_ERR_SZ;
218 0 : }
219 :
220 0 : uchar * msg = slot_get_data_pkt_payload( reasm, slot_idx );
221 0 : fd_memcpy( msg+sz0, data, data_sz );
222 :
223 0 : slot->k.sz = (ushort)( sz1 & FD_TPU_REASM_SZ_MASK );
224 0 : return FD_TPU_REASM_SUCCESS;
225 0 : }
226 :
227 : int
228 : fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
229 : fd_tpu_reasm_slot_t * slot,
230 : fd_frag_meta_t * mcache,
231 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
232 : ulong seq,
233 : long tspub,
234 : uint source_ipv4,
235 0 : uchar source_tpu ) {
236 :
237 0 : ulong depth = reasm->depth;
238 :
239 0 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
240 0 : return FD_TPU_REASM_ERR_STATE;
241 :
242 : /* Derive chunk index */
243 0 : uint slot_idx = slot_get_idx( reasm, slot );
244 0 : uchar * buf = slot_get_data( reasm, slot_idx );
245 0 : ulong chunk = fd_laddr_to_chunk( base, buf );
246 0 : if( FD_UNLIKELY( ( (ulong)buf<(ulong)base ) |
247 0 : ( chunk>UINT_MAX ) ) ) {
248 0 : FD_LOG_CRIT(( "invalid base %p for slot %p in tpu_reasm %p",
249 0 : base, (void *)slot, (void *)reasm ));
250 0 : }
251 :
252 : /* Find least recently published slot. This is our "freed slot".
253 : (Every time a new slot is published, another slot is simultaneously
254 : freed) */
255 0 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
256 0 : uint freed_slot_idx = *pub_slot;
257 0 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
258 : /* mcache corruption */
259 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
260 0 : freed_slot_idx, reasm->slot_cnt ));
261 0 : fd_tpu_reasm_reset( reasm );
262 0 : return FD_TPU_REASM_ERR_STATE;
263 0 : }
264 :
265 : /* Publish to mcache */
266 0 : ulong sz = slot->k.sz;
267 0 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
268 0 : ulong tsorig_comp = slot->tsorig_comp;
269 0 : ulong tspub_comp = fd_frag_meta_ts_comp( tspub );
270 :
271 0 : fd_txn_m_t * txnm = (fd_txn_m_t *)buf;
272 0 : *txnm = (fd_txn_m_t) { 0UL };
273 0 : txnm->payload_sz = (ushort)sz;
274 0 : txnm->source_ipv4 = source_ipv4;
275 0 : txnm->source_tpu = source_tpu;
276 :
277 0 : # if FD_HAS_AVX
278 0 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
279 : # elif FD_HAS_SSE
280 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
281 : # else
282 : fd_mcache_publish ( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
283 : # endif
284 :
285 : /* Mark new slot as published */
286 0 : slotq_remove( reasm, slot );
287 0 : slot->k.state = FD_TPU_REASM_STATE_PUB;
288 0 : *pub_slot = slot_idx;
289 :
290 : /* Free oldest published slot */
291 0 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
292 0 : uint free_slot_state = free_slot->k.state;
293 0 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
294 : /* mcache/slots out of sync (memory leak) */
295 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
296 0 : seq, freed_slot_idx, free_slot_state ));
297 0 : fd_tpu_reasm_reset( reasm );
298 0 : return FD_TPU_REASM_ERR_STATE;
299 0 : }
300 0 : free_slot->k.state = FD_TPU_REASM_STATE_FREE;
301 0 : slotq_push_tail( reasm, free_slot );
302 :
303 0 : return FD_TPU_REASM_SUCCESS;
304 0 : }
305 :
306 : void
307 : fd_tpu_reasm_cancel( fd_tpu_reasm_t * reasm,
308 0 : fd_tpu_reasm_slot_t * slot ) {
309 0 : if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) ) return;
310 0 : slotq_remove( reasm, slot );
311 0 : smap_remove( reasm, slot );
312 0 : slot->k.state = FD_TPU_REASM_STATE_FREE;
313 0 : slot->k.conn_uid = ULONG_MAX;
314 0 : slot->k.stream_id = 0UL;
315 0 : slotq_push_tail( reasm, slot );
316 0 : }
317 :
318 : int
319 : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
320 : uchar const * data,
321 : ulong sz,
322 : fd_frag_meta_t * mcache,
323 : void * base, /* Assumed aligned FD_CHUNK_ALIGN */
324 : ulong seq,
325 : long tspub,
326 : uint source_ipv4,
327 0 : uchar source_tpu ) {
328 :
329 0 : ulong depth = reasm->depth;
330 0 : if( FD_UNLIKELY( sz>FD_TPU_REASM_MTU ) ) return FD_TPU_REASM_ERR_SZ;
331 :
332 : /* Acquire least recent slot. This is our "new slot" */
333 0 : fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
334 0 : smap_remove( reasm, slot );
335 0 : slot_begin( slot );
336 :
337 : /* Derive buffer address of new slot */
338 0 : uint slot_idx = slot_get_idx( reasm, slot );
339 0 : uchar * buf = slot_get_data( reasm, slot_idx );
340 0 : ulong chunk = fd_laddr_to_chunk( base, buf );
341 0 : if( FD_UNLIKELY( ( (ulong)buf<(ulong)base ) |
342 0 : ( chunk>UINT_MAX ) ) ) {
343 0 : FD_LOG_ERR(( "Computed invalid chunk index (base=%p buf=%p chunk=%lx)",
344 0 : base, (void *)buf, chunk ));
345 0 : }
346 :
347 : /* Find least recently published slot. This is our "freed slot".
348 : (Every time a new slot is published, another slot is simultaneously
349 : freed) */
350 0 : uint * pub_slot = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
351 0 : uint freed_slot_idx = *pub_slot;
352 0 : if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
353 : /* mcache corruption */
354 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
355 0 : freed_slot_idx, reasm->slot_cnt ));
356 0 : fd_tpu_reasm_reset( reasm );
357 0 : return FD_TPU_REASM_ERR_STATE;
358 0 : }
359 :
360 : /* Copy data into new slot */
361 0 : FD_COMPILER_MFENCE();
362 0 : slot->k.sz = sz & FD_TPU_REASM_SZ_MASK;
363 0 : fd_txn_m_t * txnm = (fd_txn_m_t *)buf;
364 0 : *txnm = (fd_txn_m_t) { 0UL };
365 0 : txnm->payload_sz = (ushort)slot->k.sz,
366 0 : txnm->source_ipv4 = source_ipv4;
367 0 : txnm->source_tpu = source_tpu;
368 0 : fd_memcpy( buf + sizeof(fd_txn_m_t), data, sz );
369 0 : FD_COMPILER_MFENCE();
370 0 : slot->k.state = FD_TPU_REASM_STATE_PUB;
371 0 : FD_COMPILER_MFENCE();
372 :
373 : /* Publish new slot, while simultaneously removing all references to
374 : the old slot */
375 0 : *pub_slot = slot_idx;
376 0 : ulong ctl = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
377 0 : uint tsorig_comp = slot->tsorig_comp;
378 0 : uint tspub_comp = (uint)fd_frag_meta_ts_comp( tspub );
379 0 : # if FD_HAS_AVX
380 0 : fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
381 : # elif FD_HAS_SSE
382 : fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
383 : # else
384 : fd_mcache_publish ( mcache, depth, seq, 0UL, chunk, fd_txn_m_realized_footprint( txnm, 0, 0 ), ctl, tsorig_comp, tspub_comp );
385 : # endif
386 :
387 : /* Free old slot */
388 0 : fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
389 0 : uint free_slot_state = free_slot->k.state;
390 0 : if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
391 : /* mcache/slots out of sync (memory leak) */
392 0 : FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
393 0 : seq, freed_slot_idx, free_slot_state ));
394 0 : fd_tpu_reasm_reset( reasm );
395 0 : return FD_TPU_REASM_ERR_STATE;
396 0 : }
397 0 : free_slot->k.state = FD_TPU_REASM_STATE_FREE;
398 0 : slotq_push_tail( reasm, free_slot );
399 0 : return FD_TPU_REASM_SUCCESS;
400 0 : }
|