LCOV - code coverage report
Current view: top level - util/tpool - fd_tpool.inc (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 258 0.0 %
Date: 2026-03-19 18:19:27 Functions: 0 13 0.0 %

          Line data    Source code
       1             : #include "fd_tpool.h"
       2             : 
       3             : #if FD_HAS_THREADS
       4             : #include <pthread.h>
       5             : #endif
       6             : 
       7             : struct fd_tpool_private_worker_cfg {
       8             :   fd_tpool_t * tpool;
       9             :   ulong        tile_idx;
      10             : };
      11             : 
      12             : typedef struct fd_tpool_private_worker_cfg fd_tpool_private_worker_cfg_t;
      13             : 
      14             : static int
      15             : fd_tpool_private_worker_( int     argc,
      16           0 :                           char ** argv ) {
      17           0 :   ulong                           worker_idx = (ulong)(uint)argc;
      18           0 :   fd_tpool_private_worker_cfg_t * cfg        = (fd_tpool_private_worker_cfg_t *)argv;
      19             : 
      20           0 :   fd_tpool_t * tpool    = cfg->tpool;
      21           0 :   ulong        tile_idx = cfg->tile_idx;
      22             : 
      23             :   /* We are BOOT */
      24             : 
      25           0 :   fd_tpool_private_worker_t worker[1];
      26             : 
      27           0 :   memset( worker, 0, sizeof(fd_tpool_private_worker_t) );
      28             : 
      29           0 :   worker->tile_idx = (uint) tile_idx;
      30             : 
      31           0 : # if FD_HAS_THREADS
      32           0 :   int sleeper = !!(tpool->opt & FD_TPOOL_OPT_SLEEP);
      33             : 
      34           0 :   pthread_mutex_t lock[1];
      35           0 :   pthread_cond_t  wake[1];
      36             : 
      37           0 :   if( FD_UNLIKELY( sleeper ) ) {
      38           0 :     if( FD_UNLIKELY( pthread_mutex_init( lock, NULL ) ) ) FD_LOG_ERR(( "pthread_mutex_init failed" ));
      39           0 :     if( FD_UNLIKELY( pthread_cond_init ( wake, NULL ) ) ) FD_LOG_ERR(( "pthread_cond_init failed"  ));
      40           0 :     if( FD_UNLIKELY( pthread_mutex_lock( lock       ) ) ) FD_LOG_ERR(( "pthread_mutex_lock failed" ));
      41           0 :   }
      42             : 
      43           0 :   worker->lock = (ulong)lock;
      44           0 :   worker->wake = (ulong)wake;
      45           0 : # endif
      46             : 
      47           0 :   FD_COMPILER_MFENCE();
      48             : 
      49           0 :   fd_tpool_private_worker( tpool )[ worker_idx ] = worker;
      50             : 
      51           0 :   ulong const * arg  = worker->arg;
      52           0 :   uint          seq1 = worker->seq1;
      53             : 
      54           0 :   for(;;) {
      55             : 
      56             :     /* We are IDLE ... see what we should do next */
      57             : 
      58           0 : #   if FD_HAS_THREADS
      59           0 :     if( FD_UNLIKELY( sleeper ) && FD_UNLIKELY( pthread_cond_wait( wake, lock ) ) )
      60           0 :       FD_LOG_WARNING(( "pthread_cond_wait failed; attempting to continue" ));
      61           0 : #   endif
      62             : 
      63           0 :     FD_COMPILER_MFENCE();
      64           0 :     uint  seq0     = worker->seq0;
      65           0 :     FD_COMPILER_MFENCE();
      66           0 :     uint  _arg_cnt = worker->arg_cnt;
      67           0 :     ulong _task    = worker->task;
      68           0 :     FD_COMPILER_MFENCE();
      69             : 
      70           0 :     if( FD_UNLIKELY( seq0==seq1 ) ) { /* Got idle */
      71           0 :       FD_SPIN_PAUSE();
      72           0 :       continue;
      73           0 :     }
      74             : 
      75           0 :     if( FD_UNLIKELY( !_task ) ) break; /* Got halt */
      76             : 
      77             :     /* We are EXEC ... do the task and then transition to IDLE */
      78             : 
      79           0 : #   ifdef __cplusplus
      80           0 :     try {
      81           0 : #   endif
      82             : 
      83           0 :       if( _arg_cnt==UINT_MAX ) {
      84             : 
      85           0 :         fd_tpool_task_t task = (fd_tpool_task_t)_task;
      86             : 
      87           0 :         void * task_tpool  = (void *)arg[ 0];
      88           0 :         ulong  task_t0     =         arg[ 1]; ulong task_t1     = arg[ 2];
      89           0 :         void * task_args   = (void *)arg[ 3];
      90           0 :         void * task_reduce = (void *)arg[ 4]; ulong task_stride = arg[ 5];
      91           0 :         ulong  task_l0     =         arg[ 6]; ulong task_l1     = arg[ 7];
      92           0 :         ulong  task_m0     =         arg[ 8]; ulong task_m1     = arg[ 9];
      93           0 :         ulong  task_n0     =         arg[10]; ulong task_n1     = arg[11];
      94             : 
      95           0 :         task( task_tpool,task_t0,task_t1, task_args, task_reduce,task_stride, task_l0,task_l1, task_m0,task_m1, task_n0,task_n1 );
      96             : 
      97           0 :       } else {
      98             : 
      99           0 :         fd_tpool_task_v2_t task = (fd_tpool_task_v2_t)_task;
     100             : 
     101           0 :         task( tpool, worker_idx, (ulong)_arg_cnt, arg );
     102             : 
     103           0 :       }
     104             : 
     105           0 : #   ifdef __cplusplus
     106           0 :     } catch( ... ) {
     107           0 :       FD_LOG_WARNING(( "uncaught exception; attempting to continue" ));
     108           0 :     }
     109           0 : #   endif
     110             : 
     111           0 :     FD_COMPILER_MFENCE();
     112             : 
     113           0 :     worker->seq1 = seq0;
     114           0 :     seq1 = seq0;
     115           0 :   }
     116             : 
     117             :   /* We are HALT ... clean up and terminate */
     118             : 
     119           0 : # if FD_HAS_THREADS
     120           0 :   if( FD_UNLIKELY( sleeper ) ) {
     121           0 :     if( FD_UNLIKELY( pthread_mutex_unlock ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
     122           0 :     if( FD_UNLIKELY( pthread_cond_destroy ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_destroy failed; attempting to continue" ));
     123           0 :     if( FD_UNLIKELY( pthread_mutex_destroy( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_destroy failed; attempting to continue" ));
     124           0 :   }
     125           0 : # endif
     126             : 
     127           0 :   return 0;
     128           0 : }
     129             : 
     130             : #if FD_HAS_THREADS
     131             : void
     132           0 : fd_tpool_private_wake( fd_tpool_private_worker_t * worker ) {
     133           0 :   pthread_mutex_t * lock = (pthread_mutex_t *)worker->lock;
     134           0 :   pthread_cond_t *  wake = (pthread_cond_t  *)worker->wake;
     135           0 :   if( FD_UNLIKELY( pthread_mutex_lock  ( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_lock failed; attempting to continue" ));
     136           0 :   if( FD_UNLIKELY( pthread_cond_signal ( wake ) ) ) FD_LOG_WARNING(( "pthread_cond_signal failed; attempting to continue" ));
     137           0 :   if( FD_UNLIKELY( pthread_mutex_unlock( lock ) ) ) FD_LOG_WARNING(( "pthread_mutex_unlock failed; attempting to continue" ));
     138           0 : }
     139             : #endif
     140             : 
     141             : ulong
     142           0 : fd_tpool_align( void ) {
     143           0 :   return FD_TPOOL_ALIGN;
     144           0 : }
     145             : 
     146             : ulong
     147           0 : fd_tpool_footprint( ulong worker_max ) {
     148           0 :   if( FD_UNLIKELY( !((1UL<=worker_max) & (worker_max<=FD_TILE_MAX)) ) ) return 0UL;
     149           0 :   return fd_ulong_align_up( sizeof(fd_tpool_private_worker_t) +
     150           0 :                             sizeof(fd_tpool_t) + worker_max*sizeof(fd_tpool_private_worker_t *), FD_TPOOL_ALIGN );
     151           0 : }
     152             : 
     153             : fd_tpool_t *
     154             : fd_tpool_init( void * mem,
     155             :                ulong  worker_max,
     156           0 :                ulong  opt ) {
     157             : 
     158           0 :   FD_COMPILER_MFENCE();
     159             : 
     160           0 :   if( FD_UNLIKELY( !mem ) ) {
     161           0 :     FD_LOG_WARNING(( "NULL mem" ));
     162           0 :     return NULL;
     163           0 :   }
     164             : 
     165           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_tpool_align() ) ) ) {
     166           0 :     FD_LOG_WARNING(( "bad alignment" ));
     167           0 :     return NULL;
     168           0 :   }
     169             : 
     170           0 :   ulong footprint = fd_tpool_footprint( worker_max );
     171           0 :   if( FD_UNLIKELY( !footprint ) ) {
     172           0 :     FD_LOG_WARNING(( "bad worker_max" ));
     173           0 :     return NULL;
     174           0 :   }
     175             : 
     176           0 :   fd_memset( mem, 0, footprint );
     177             : 
     178           0 :   fd_tpool_private_worker_t * worker0 = (fd_tpool_private_worker_t *)mem;
     179             : 
     180           0 :   worker0->seq0 = 1U;
     181           0 :   worker0->seq1 = 0U;
     182             : 
     183           0 :   fd_tpool_t * tpool = (fd_tpool_t *)(worker0+1);
     184             : 
     185           0 :   tpool->opt        = opt;
     186           0 :   tpool->worker_max = (uint)worker_max;
     187           0 :   tpool->worker_cnt = 1U;
     188             : 
     189           0 :   FD_COMPILER_MFENCE();
     190           0 :   fd_tpool_private_worker( tpool )[0] = worker0;
     191           0 :   FD_COMPILER_MFENCE();
     192             : 
     193           0 :   return tpool;
     194           0 : }
     195             : 
     196             : void *
     197           0 : fd_tpool_fini( fd_tpool_t * tpool ) {
     198             : 
     199           0 :   FD_COMPILER_MFENCE();
     200             : 
     201           0 :   if( FD_UNLIKELY( !tpool ) ) {
     202           0 :     FD_LOG_WARNING(( "NULL tpool" ));
     203           0 :     return NULL;
     204           0 :   }
     205             : 
     206           0 :   while( fd_tpool_worker_cnt( tpool )>1UL ) {
     207           0 :     if( FD_UNLIKELY( !fd_tpool_worker_pop( tpool ) ) ) {
     208           0 :       FD_LOG_WARNING(( "fd_tpool_worker_pop failed" ));
     209           0 :       return NULL;
     210           0 :     }
     211           0 :   }
     212             : 
     213           0 :   return (void *)fd_tpool_private_worker0( tpool );
     214           0 : }
     215             : 
     216             : fd_tpool_t *
     217             : fd_tpool_worker_push( fd_tpool_t * tpool,
     218           0 :                       ulong        tile_idx ) {
     219             : 
     220           0 :   FD_COMPILER_MFENCE();
     221             : 
     222           0 :   if( FD_UNLIKELY( !tpool ) ) {
     223           0 :     FD_LOG_WARNING(( "NULL tpool" ));
     224           0 :     return NULL;
     225           0 :   }
     226             : 
     227           0 :   if( FD_UNLIKELY( !tile_idx ) ) {
     228           0 :     FD_LOG_WARNING(( "cannot push tile_idx 0" ));
     229           0 :     return NULL;
     230           0 :   }
     231             : 
     232           0 :   if( FD_UNLIKELY( tile_idx==fd_tile_idx() ) ) {
     233           0 :     FD_LOG_WARNING(( "cannot push self" ));
     234           0 :     return NULL;
     235           0 :   }
     236             : 
     237           0 :   if( FD_UNLIKELY( tile_idx>=fd_tile_cnt() ) ) {
     238           0 :     FD_LOG_WARNING(( "invalid tile_idx" ));
     239           0 :     return NULL;
     240           0 :   }
     241             : 
     242           0 :   fd_tpool_private_worker_t ** worker     = fd_tpool_private_worker( tpool );
     243           0 :   ulong                        worker_cnt = (ulong)tpool->worker_cnt;
     244             : 
     245           0 :   if( FD_UNLIKELY( worker_cnt>=(ulong)tpool->worker_max ) ) {
     246           0 :     FD_LOG_WARNING(( "too many workers" ));
     247           0 :     return NULL;
     248           0 :   }
     249             : 
     250           0 :   for( ulong worker_idx=0UL; worker_idx<worker_cnt; worker_idx++ )
     251           0 :     if( worker[ worker_idx ]->tile_idx==tile_idx ) {
     252           0 :       FD_LOG_WARNING(( "tile_idx already added to tpool" ));
     253           0 :       return NULL;
     254           0 :     }
     255             : 
     256           0 :   fd_tpool_private_worker_cfg_t cfg[1];
     257             : 
     258           0 :   cfg->tpool    = tpool;
     259           0 :   cfg->tile_idx = tile_idx;
     260             : 
     261           0 :   int     argc = (int)(uint)worker_cnt;
     262           0 :   char ** argv = (char **)fd_type_pun( cfg );
     263             : 
     264           0 :   FD_COMPILER_MFENCE();
     265           0 :   worker[ worker_cnt ] = NULL;
     266           0 :   FD_COMPILER_MFENCE();
     267             : 
     268           0 :   if( FD_UNLIKELY( !fd_tile_exec_new( tile_idx, fd_tpool_private_worker_, argc, argv ) ) ) {
     269           0 :     FD_LOG_WARNING(( "fd_tile_exec_new failed (tile probably already in use)" ));
     270           0 :     return NULL;
     271           0 :   }
     272             : 
     273           0 :   while( !FD_VOLATILE_CONST( worker[ worker_cnt ] ) ) FD_SPIN_PAUSE();
     274             : 
     275           0 :   tpool->worker_cnt = (uint)(worker_cnt + 1UL);
     276           0 :   return tpool;
     277           0 : }
     278             : 
     279             : fd_tpool_t *
     280           0 : fd_tpool_worker_pop( fd_tpool_t * tpool ) {
     281             : 
     282           0 :   FD_COMPILER_MFENCE();
     283             : 
     284           0 :   if( FD_UNLIKELY( !tpool ) ) {
     285           0 :     FD_LOG_WARNING(( "NULL tpool" ));
     286           0 :     return NULL;
     287           0 :   }
     288             : 
     289           0 :   ulong worker_cnt = (ulong)tpool->worker_cnt;
     290           0 :   if( FD_UNLIKELY( worker_cnt<=1UL ) ) {
     291           0 :     FD_LOG_WARNING(( "no workers to pop" ));
     292           0 :     return NULL;
     293           0 :   }
     294             : 
     295             :   /* Testing for IDLE isn't strictly necessary given requirements to use
     296             :      this and this isn't being done atomically with the actually pop but
     297             :      does help catch obvious user errors. */
     298             : 
     299           0 :   if( FD_UNLIKELY( !fd_tpool_worker_idle( tpool, worker_cnt-1UL ) ) ) {
     300           0 :     FD_LOG_WARNING(( "worker to pop is not idle" ));
     301           0 :     return NULL;
     302           0 :   }
     303             : 
     304             :   /* Send HALT to the worker */
     305             : 
     306           0 :   fd_tpool_private_worker_t * worker = fd_tpool_private_worker( tpool )[ worker_cnt-1UL ];
     307           0 :   uint                        seq0   = worker->seq0 + 1U;
     308           0 :   fd_tile_exec_t *            exec   = fd_tile_exec( worker->tile_idx );
     309             : 
     310           0 :   worker->task = 0UL;
     311           0 :   FD_COMPILER_MFENCE();
     312           0 :   worker->seq0 = seq0;
     313           0 :   FD_COMPILER_MFENCE();
     314           0 :   if( FD_UNLIKELY( tpool->opt & FD_TPOOL_OPT_SLEEP ) ) fd_tpool_private_wake( worker );
     315             : 
     316             :   /* Wait for the worker to shutdown */
     317             : 
     318           0 :   int          ret;
     319           0 :   char const * err = fd_tile_exec_delete( exec, &ret );
     320           0 :   if(      FD_UNLIKELY( err ) ) FD_LOG_WARNING(( "tile err \"%s\" unexpected; attempting to continue", err ));
     321           0 :   else if( FD_UNLIKELY( ret ) ) FD_LOG_WARNING(( "tile ret %i unexpected; attempting to continue", ret ));
     322             : 
     323           0 :   tpool->worker_cnt = (uint)(worker_cnt - 1UL);
     324           0 :   return tpool;
     325           0 : }
     326             : 
     327             : #define FD_TPOOL_EXEC_ALL_IMPL_HDR(style)                                                          \
     328             : void                                                                                               \
     329             : fd_tpool_private_exec_all_##style##_node( void * _node_tpool,                                      \
     330             :                                           ulong  node_t0, ulong node_t1,                           \
     331             :                                           void * args,                                             \
     332             :                                           void * reduce,  ulong stride,                            \
     333             :                                           ulong  l0,      ulong l1,                                \
     334             :                                           ulong  _task,   ulong _tpool,                            \
     335           0 :                                           ulong  t0,      ulong t1 ) {                             \
     336           0 :   fd_tpool_t *    node_tpool = (fd_tpool_t *   )_node_tpool;                                       \
     337           0 :   fd_tpool_task_t task       = (fd_tpool_task_t)_task;                                             \
     338           0 :   ulong           wait_cnt   = 0UL;                                                                \
     339           0 :   ushort          wait_child[16];   /* Assumes tpool_cnt<=65536 */                                 \
     340           0 :   for(;;) {                                                                                        \
     341           0 :     ulong node_t_cnt = node_t1 - node_t0;                                                          \
     342           0 :     if( node_t_cnt<=1L ) break;                                                                    \
     343           0 :     ulong node_ts = node_t0 + fd_tpool_private_split( node_t_cnt );                                \
     344           0 :     fd_tpool_exec( node_tpool, node_ts, fd_tpool_private_exec_all_##style##_node,                  \
     345           0 :                    node_tpool, node_ts,node_t1, args, reduce,stride, l0,l1, _task,_tpool, t0,t1 ); \
     346           0 :     wait_child[ wait_cnt++ ] = (ushort)node_ts;                                                    \
     347           0 :     node_t1 = node_ts;                                                                             \
     348           0 :   }
     349             : 
     350             : #define FD_TPOOL_EXEC_ALL_IMPL_FTR                                                \
     351           0 :   while( wait_cnt ) fd_tpool_wait( node_tpool, (ulong)wait_child[ --wait_cnt ] ); \
     352           0 : }
     353             : 
     354           0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(rrobin)
     355           0 :   ulong m_stride = t1-t0;
     356           0 :   ulong m        = l0 + fd_ulong_min( node_t0-t0, ULONG_MAX-l0 ); /* robust against overflow */
     357           0 :   while( m<l1 ) {
     358           0 :     task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     359           0 :     m += fd_ulong_min( m_stride, ULONG_MAX-m ); /* robust against overflow */
     360           0 :   }
     361           0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     362             : 
     363           0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(block)
     364           0 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     365           0 :   for( ulong m=m0; m<m1; m++ ) task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m,m+1UL, node_t0,node_t1 );
     366           0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     367             : 
     368             : #if FD_HAS_ATOMIC
     369           0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(taskq)
     370           0 :   ulong * l_next = (ulong *)_tpool;
     371           0 :   void  * tpool  = (void *)l_next[1];
     372           0 :   for(;;) {
     373             : 
     374             :     /* Note that we use an ATOMIC_CAS here instead of an
     375             :        ATOMIC_FETCH_AND_ADD to avoid overflow risks by having threads
     376             :        increment l0 into the tail.  ATOMIC_FETCH_AND_ADD could be used
     377             :        if there is no requirement to the effect that l1+FD_TILE_MAX does
     378             :        not overflow. */
     379             : 
     380           0 :     FD_COMPILER_MFENCE();
     381           0 :     ulong m0 = *l_next;
     382           0 :     FD_COMPILER_MFENCE();
     383             : 
     384           0 :     if( FD_UNLIKELY( m0>=l1 ) ) break;
     385           0 :     ulong m1 = m0+1UL;
     386           0 :     if( FD_UNLIKELY( FD_ATOMIC_CAS( l_next, m0, m1 )!=m0 ) ) {
     387           0 :       FD_SPIN_PAUSE();
     388           0 :       continue;
     389           0 :     }
     390             : 
     391           0 :     task( tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     392           0 :   }
     393           0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     394             : #endif
     395             : 
     396           0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(batch)
     397           0 :   ulong m0; ulong m1; FD_TPOOL_PARTITION( l0,l1,1UL, node_t0-t0,t1-t0, m0,m1 );
     398           0 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, m0,m1, node_t0,node_t1 );
     399           0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     400             : 
     401           0 : FD_TPOOL_EXEC_ALL_IMPL_HDR(raw)
     402           0 :   task( (void *)_tpool,t0,t1, args,reduce,stride, l0,l1, l0,l1, node_t0,node_t1 );
     403           0 : FD_TPOOL_EXEC_ALL_IMPL_FTR
     404             : 
     405             : #undef FD_TPOOL_EXEC_ALL_IMPL_FTR
     406             : #undef FD_TPOOL_EXEC_ALL_IMPL_HDR

Generated by: LCOV version 1.14