Line data Source code
1 : #include "fd_tpool.h"
2 :
3 : #if FD_HAS_THREADS
4 : #include <pthread.h>
5 : #endif
6 :
7 : struct fd_tpool_private_worker_cfg {
8 : fd_tpool_t * tpool;
9 : ulong tile_idx;
10 : };
11 :
12 : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
13 :
14 : static int
15 : fd_tpool_private_worker_( int argc,
16 0 : char ** argv ) {
17 0 : ulong worker_idx = (ulong)(uint)argc;
18 0 : fd_tpool_private_worker_cfg_t * cfg = (fd_tpool_private_worker_cfg_t *)argv;
19 :
20 0 : fd_tpool_t * tpool = cfg->tpool;
21 0 : ulong tile_idx = cfg->tile_idx;
22 :
23 : /* We are BOOT */
24 :
25 0 : fd_tpool_private_worker_t worker[1];
26 :
27 0 : memset( worker, 0, sizeof(fd_tpool_private_worker_t) );
28 :
29 0 : worker->tile_idx = (uint) tile_idx;
30 :
31 0 : # if FD_HAS_THREADS
32 0 : int sleeper = !!(tpool->opt & FD_TPOOL_OPT_SLEEP);
33 :
34 0 : pthread_mutex_t lock[1];
35 0 : pthread_cond_t wake[1];
36 :
37 0 : if( FD_UNLIKELY( sleeper ) ) {
38 0 : if( FD_UNLIKELY( pthread_mutex_init( lock, NULL ) ) ) FD_LOG_ERR(( "pthread_mutex_init failed" ));
39 0 : if( FD_UNLIKELY( pthread_cond_init ( wake, NULL ) ) ) FD_LOG_ERR(( "pthread_cond_init failed" ));
40 0 : if( FD_UNLIKELY( pthread_mutex_lock( lock ) ) ) FD_LOG_ERR(( "pthread_mutex_lock failed" ));
41 0 : }
42 :
43 0 : worker->lock = (ulong)lock;
44 0 : worker->wake = (ulong)wake;
45 0 : # endif
46 :
47 0 : FD_COMPILER_MFENCE();
48 :
49 0 : fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
50 :
51 0 : ulong const * arg = worker->arg;
52 0 : uint seq1 = worker->seq1;
53 :
54 0 : for(;;) {
55 :
56 : /* We are IDLE ... see what we should do next */
57 :
58 0 : # if FD_HAS_THREADS
59 0 : if( FD_UNLIKELY( sleeper ) && FD_UNLIKELY( pthread_cond_wait( wake, lock ) ) )
60 0 : FD_LOG_WARNING(( "pthread_cond_wait failed; attempting to continue" ));
61 0 : # endif
62 :
63 0 : FD_COMPILER_MFENCE();
64 0 : uint seq0 = worker->seq0;
65 0 : FD_COMPILER_MFENCE();
66 0 : uint _arg_cnt = worker->arg_cnt;
67 0 : ulong _task = worker->task;
68 0 : FD_COMPILER_MFENCE();
69 :
70 0 : if( FD_UNLIKELY( seq0==seq1 ) ) { /* Got idle */
71 0 : FD_SPIN_PAUSE();
72 0 : continue;
73 0 : }
74 :
75 0 : if( FD_UNLIKELY( !_task ) ) break; /* Got halt */
76 :
77 : /* We are EXEC ... do the task and then transition to IDLE */
78 :
79 0 : # ifdef __cplusplus
80 0 : try {
81 0 : # endif
82 :
83 0 : if( _arg_cnt==UINT_MAX ) {
84 :
85 0 : fd_tpool_task_t task = (fd_tpool_task_t)_task;
86 :
87 0 : void * task_tpool = (void *)arg[ 0];
88 0 : ulong task_t0 = arg[ 1]; ulong task_t1 = arg[ 2];
89 0 : void * task_args = (void *)arg[ 3];
90 0 : void * task_reduce = (void *)arg[ 4]; ulong task_stride = arg[ 5];
91 0 : ulong task_l0 = arg[ 6]; ulong task_l1 = arg[ 7];
92 0 : ulong task_m0 = arg[ 8]; ulong task_m1 = arg[ 9];
93 0 : ulong task_n0 = arg[10]; ulong task_n1 = arg[11];
94 :
95 0 : task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
96 :
97 0 : } else {
98 :
99 0 : fd_tpool_task_v2_t task = (fd_tpool_task_v2_t)_task;
100 :
101 0 : task( tpool, worker_idx, (ulong)_arg_cnt, arg );
102 :
103 0 : }
104 :
105 0 : # ifdef __cplusplus
106 0 : } catch( ... ) {
107 0 : FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
108 0 : }
109 0 : # endif
110 :
111 0 : FD_COMPILER_MFENCE();
112 :
113 0 : worker->seq1 = seq0;
114 0 : seq1 = seq0;
115 0 : }
116 :
117 : /* We are HALT ... clean up and terminate */
118 :
119 0 : # if FD_HAS_THREADS
120 0 : if( FD_UNLIKELY( sleeper ) ) {
121 0 : if( FD_UNLIKELY( pthread_mutex_unlock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
122 0 : if( FD_UNLIKELY( pthread_cond_destroy ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_destroy failed; attempting to continue" ));
123 0 : if( FD_UNLIKELY( pthread_mutex_destroy( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_destroy failed; attempting to continue" ));
124 0 : }
125 0 : # endif
126 :
127 0 : return 0;
128 0 : }
129 :
130 : #if FD_HAS_THREADS
131 : void
132 0 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
133 0 : pthread_mutex_t * lock = (pthread_mutex_t *)worker->lock;
134 0 : pthread_cond_t * wake = (pthread_cond_t *)worker->wake;
135 0 : if( FD_UNLIKELY( pthread_mutex_lock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_lock failed; attempting to continue" ));
136 0 : if( FD_UNLIKELY( pthread_cond_signal ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_signal failed; attempting to continue" ));
137 0 : if( FD_UNLIKELY( pthread_mutex_unlock( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
138 0 : }
139 : #endif
140 :
141 : ulong
142 0 : fd_tpool_align( void ) {
143 0 : return FD_TPOOL_ALIGN;
144 0 : }
145 :
146 : ulong
147 0 : fd_tpool_footprint( ulong worker_max ) {
148 0 : if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
149 0 : return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
150 0 : sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
151 0 : }
152 :
153 : fd_tpool_t *
154 : fd_tpool_init( void * mem,
155 : ulong worker_max,
156 0 : ulong opt ) {
157 :
158 0 : FD_COMPILER_MFENCE();
159 :
160 0 : if( FD_UNLIKELY( !mem ) ) {
161 0 : FD_LOG_WARNING(( "NULL mem" ));
162 0 : return NULL;
163 0 : }
164 :
165 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
166 0 : FD_LOG_WARNING(( "bad alignment" ));
167 0 : return NULL;
168 0 : }
169 :
170 0 : ulong footprint = fd_tpool_footprint( worker_max );
171 0 : if( FD_UNLIKELY( !footprint ) ) {
172 0 : FD_LOG_WARNING(( "bad worker_max" ));
173 0 : return NULL;
174 0 : }
175 :
176 0 : fd_memset( mem, 0, footprint );
177 :
178 0 : fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
179 :
180 0 : worker0->seq0 = 1U;
181 0 : worker0->seq1 = 0U;
182 :
183 0 : fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
184 :
185 0 : tpool->opt = opt;
186 0 : tpool->worker_max = (uint)worker_max;
187 0 : tpool->worker_cnt = 1U;
188 :
189 0 : FD_COMPILER_MFENCE();
190 0 : fd_tpool_private_worker( tpool )[0] = worker0;
191 0 : FD_COMPILER_MFENCE();
192 :
193 0 : return tpool;
194 0 : }
195 :
196 : void *
197 0 : fd_tpool_fini( fd_tpool_t * tpool ) {
198 :
199 0 : FD_COMPILER_MFENCE();
200 :
201 0 : if( FD_UNLIKELY( !tpool ) ) {
202 0 : FD_LOG_WARNING(( "NULL tpool" ));
203 0 : return NULL;
204 0 : }
205 :
206 0 : while( fd_tpool_worker_cnt( tpool )>1UL ) {
207 0 : if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
208 0 : FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
209 0 : return NULL;
210 0 : }
211 0 : }
212 :
213 0 : return (void *)fd_tpool_private_worker0( tpool );
214 0 : }
215 :
216 : fd_tpool_t *
217 : fd_tpool_worker_push( fd_tpool_t * tpool,
218 0 : ulong tile_idx ) {
219 :
220 0 : FD_COMPILER_MFENCE();
221 :
222 0 : if( FD_UNLIKELY( !tpool ) ) {
223 0 : FD_LOG_WARNING(( "NULL tpool" ));
224 0 : return NULL;
225 0 : }
226 :
227 0 : if( FD_UNLIKELY( !tile_idx ) ) {
228 0 : FD_LOG_WARNING(( "cannot push tile_idx 0" ));
229 0 : return NULL;
230 0 : }
231 :
232 0 : if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
233 0 : FD_LOG_WARNING(( "cannot push self" ));
234 0 : return NULL;
235 0 : }
236 :
237 0 : if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
238 0 : FD_LOG_WARNING(( "invalid tile_idx" ));
239 0 : return NULL;
240 0 : }
241 :
242 0 : fd_tpool_private_worker_t ** worker = fd_tpool_private_worker( tpool );
243 0 : ulong worker_cnt = (ulong)tpool->worker_cnt;
244 :
245 0 : if( FD_UNLIKELY( worker_cnt>=(ulong)tpool->worker_max ) ) {
246 0 : FD_LOG_WARNING(( "too many workers" ));
247 0 : return NULL;
248 0 : }
249 :
250 0 : for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
251 0 : if( worker[ worker_idx ]->tile_idx==tile_idx ) {
252 0 : FD_LOG_WARNING(( "tile_idx already added to tpool" ));
253 0 : return NULL;
254 0 : }
255 :
256 0 : fd_tpool_private_worker_cfg_t cfg[1];
257 :
258 0 : cfg->tpool = tpool;
259 0 : cfg->tile_idx = tile_idx;
260 :
261 0 : int argc = (int)(uint)worker_cnt;
262 0 : char ** argv = (char **)fd_type_pun( cfg );
263 :
264 0 : FD_COMPILER_MFENCE();
265 0 : worker[ worker_cnt ] = NULL;
266 0 : FD_COMPILER_MFENCE();
267 :
268 0 : if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker_, argc, argv ) ) ) {
269 0 : FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
270 0 : return NULL;
271 0 : }
272 :
273 0 : while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
274 :
275 0 : tpool->worker_cnt = (uint)(worker_cnt + 1UL);
276 0 : return tpool;
277 0 : }
278 :
279 : fd_tpool_t *
280 0 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
281 :
282 0 : FD_COMPILER_MFENCE();
283 :
284 0 : if( FD_UNLIKELY( !tpool ) ) {
285 0 : FD_LOG_WARNING(( "NULL tpool" ));
286 0 : return NULL;
287 0 : }
288 :
289 0 : ulong worker_cnt = (ulong)tpool->worker_cnt;
290 0 : if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
291 0 : FD_LOG_WARNING(( "no workers to pop" ));
292 0 : return NULL;
293 0 : }
294 :
295 : /* Testing for IDLE isn't strictly necessary given requirements to use
296 : this and this isn't being done atomically with the actually pop but
297 : does help catch obvious user errors. */
298 :
299 0 : if( FD_UNLIKELY( !fd_tpool_worker_idle( tpool, worker_cnt-1UL ) ) ) {
300 0 : FD_LOG_WARNING(( "worker to pop is not idle" ));
301 0 : return NULL;
302 0 : }
303 :
304 : /* Send HALT to the worker */
305 :
306 0 : fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
307 0 : uint seq0 = worker->seq0 + 1U;
308 0 : fd_tile_exec_t * exec = fd_tile_exec( worker->tile_idx );
309 :
310 0 : worker->task = 0UL;
311 0 : FD_COMPILER_MFENCE();
312 0 : worker->seq0 = seq0;
313 0 : FD_COMPILER_MFENCE();
314 0 : if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
315 :
316 : /* Wait for the worker to shutdown */
317 :
318 0 : int ret;
319 0 : char const * err = fd_tile_exec_delete( exec, &ret );
320 0 : if( FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
321 0 : else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
322 :
323 0 : tpool->worker_cnt = (uint)(worker_cnt - 1UL);
324 0 : return tpool;
325 0 : }
326 :
327 : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style) \
328 : void \
329 : fd_tpool_private_exec_all_##style##_node( void * _node_tpool, \
330 : ulong node_t0, ulong node_t1, \
331 : void * args, \
332 : void * reduce, ulong stride, \
333 : ulong l0, ulong l1, \
334 : ulong _task, ulong _tpool, \
335 0 : ulong t0, ulong t1 ) { \
336 0 : fd_tpool_t * node_tpool = (fd_tpool_t * )_node_tpool; \
337 0 : fd_tpool_task_t task = (fd_tpool_task_t)_task; \
338 0 : ulong wait_cnt = 0UL; \
339 0 : ushort wait_child[16]; /* Assumes tpool_cnt<=65536 */ \
340 0 : for(;;) { \
341 0 : ulong node_t_cnt = node_t1 - node_t0; \
342 0 : if( node_t_cnt<=1L ) break; \
343 0 : ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt ); \
344 0 : fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node, \
345 0 : node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
346 0 : wait_child[ wait_cnt++ ] = (ushort)node_ts; \
347 0 : node_t1 = node_ts; \
348 0 : }
349 :
350 : #define FD_TPOOL_EXEC_ALL_IMPL_FTR \
351 0 : while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
352 0 : }
353 :
354 0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
355 0 : ulong m_stride = t1-t0;
356 0 : ulong m = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
357 0 : while( m<l1 ) {
358 0 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
359 0 : m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
360 0 : }
361 0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
362 :
363 0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
364 0 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
365 0 : for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
366 0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
367 :
368 : #if FD_HAS_ATOMIC
369 0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
370 0 : ulong * l_next = (ulong *)_tpool;
371 0 : void * tpool = (void *)l_next[1];
372 0 : for(;;) {
373 :
374 : /* Note that we use an ATOMIC_CAS here instead of an
375 : ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
376 : increment l0 into the tail. ATOMIC_FETCH_AND_ADD could be used
377 : if there is no requirement to the effect that l1+FD_TILE_MAX does
378 : not overflow. */
379 :
380 0 : FD_COMPILER_MFENCE();
381 0 : ulong m0 = *l_next;
382 0 : FD_COMPILER_MFENCE();
383 :
384 0 : if( FD_UNLIKELY( m0>=l1 ) ) break;
385 0 : ulong m1 = m0+1UL;
386 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
387 0 : FD_SPIN_PAUSE();
388 0 : continue;
389 0 : }
390 :
391 0 : task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
392 0 : }
393 0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
394 : #endif
395 :
396 0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
397 0 : ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
398 0 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
399 0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
400 :
401 0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
402 0 : task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
403 0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
404 :
405 : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
406 : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR
|