Coverage Report

Created: 2026-05-11 06:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/freeradius-server/src/lib/io/schedule.c
Line
Count
Source
1
/*
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the Free Software
14
 *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15
 */
16
17
/**
18
 * $Id: 4c6ba355f46cf1d16003fedfdaa5a02702d293dc $
19
 *
20
 * @brief Network / worker thread scheduling
21
 * @file io/schedule.c
22
 *
23
 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24
 */
25
RCSID("$Id: 4c6ba355f46cf1d16003fedfdaa5a02702d293dc $")
26
27
0
#define LOG_DST sc->log
28
29
#include <freeradius-devel/autoconf.h>
30
31
#include <freeradius-devel/io/schedule.h>
32
#include <freeradius-devel/io/thread.h>
33
#include <freeradius-devel/util/dlist.h>
34
#include <freeradius-devel/util/rb.h>
35
#include <freeradius-devel/util/syserror.h>
36
#include <freeradius-devel/server/trigger.h>
37
#include <freeradius-devel/util/semaphore.h>
38
39
#include <pthread.h>
40
41
/** Scheduler specific information for worker threads
42
 *
43
 * Wraps a fr_worker_t, tracking additional information that
44
 * the scheduler uses.
45
 */
46
typedef struct {
47
  fr_thread_t thread;     //!< common thread structure - must be first!
48
49
  int   uses;     //!< how many network threads are using it
50
  fr_time_t cpu_time;   //!< how much CPU time this worker has used
51
52
  fr_schedule_t *sc;      //!< the scheduler we are running under
53
54
  fr_worker_t *worker;    //!< the worker data structure
55
} fr_schedule_worker_t;
56
57
/** Scheduler specific information for network threads
58
 *
59
 * Wraps a fr_network_t, tracking additional information that
60
 * the scheduler uses.
61
 */
62
typedef struct {
63
  fr_thread_t thread;     //!< common thread structure - must be first!
64
65
  fr_schedule_t *sc;      //!< the scheduler we are running under
66
67
  fr_network_t  *nr;      //!< the receive data structure
68
69
  fr_timer_t  *ev;    //!< timer for stats_interval
70
} fr_schedule_network_t;
71
72
73
/**
74
 *  The scheduler
75
 */
76
struct fr_schedule_s {
77
  bool    running;    //!< is the scheduler running?
78
79
  CONF_SECTION  *cs;      //!< thread pool configuration section
80
  fr_event_list_t *el;      //!< event list for single-threaded mode.
81
  bool    single_threaded;  //!< true if running in single-threaded mode.
82
83
  fr_log_t  *log;     //!< log destination
84
  fr_log_lvl_t  lvl;      //!< log level
85
86
  fr_schedule_config_t *config;   //!< configuration
87
88
  unsigned int  num_workers_exited; //!< number of exited workers
89
90
  fr_sem_t  *worker_sem;    //!< for inter-thread signaling
91
  fr_sem_t  *network_sem;   //!< for inter-thread signaling
92
  fr_sem_t  *coord_sem;   //!< for inter-thread signaling
93
94
  fr_schedule_thread_instantiate_t  worker_thread_instantiate;  //!< thread instantiation callback
95
  fr_schedule_thread_detach_t   worker_thread_detach;
96
97
  fr_dlist_head_t workers;    //!< list of workers
98
  fr_dlist_head_t networks;   //!< list of networks
99
100
  fr_network_t  *single_network;  //!< for single-threaded mode
101
  fr_worker_t *single_worker;   //!< for single-threaded mode
102
};
103
104
static _Thread_local int worker_id = -1;  //!< Internal ID of the current worker thread.
105
106
/** Return the worker id for the current thread
107
 *
108
 * @return worker ID
109
 */
110
int fr_schedule_worker_id(void)
111
0
{
112
0
  return worker_id;
113
0
}
114
115
/** Explicitly set the worker id for the current thread
116
 *
117
 * **Only to be used in test programs like unit_test_module**
118
 */
119
void fr_schedule_worker_id_set(int id)
120
0
{
121
0
  worker_id = id;
122
0
}
123
124
/** Entry point for worker threads
125
 *
126
 * @param[in] arg the fr_schedule_worker_t
127
 * @return NULL
128
 */
129
static void *fr_schedule_worker_thread(void *arg)
130
0
{
131
0
  fr_schedule_worker_t    *sw = talloc_get_type_abort(arg, fr_schedule_worker_t);
132
0
  fr_schedule_t     *sc = sw->sc;
133
0
  fr_thread_status_t    status = FR_THREAD_FAIL;
134
0
  char        worker_name[32];
135
136
0
  worker_id = sw->thread.id;    /* Store the current worker ID */
137
138
0
  snprintf(worker_name, sizeof(worker_name), "Worker %d", sw->thread.id);
139
140
0
#ifdef HAVE_PTHREAD_SETNAME_NP
141
#  ifdef __APPLE__
142
  pthread_setname_np(worker_name);
143
#  else
144
0
  pthread_setname_np(pthread_self(), worker_name);
145
0
#  endif
146
0
#endif
147
148
0
  if (fr_thread_setup(&sw->thread, worker_name) < 0) goto fail;
149
150
0
  sw->worker = fr_worker_alloc(sw->thread.ctx, sw->thread.el, worker_name, sc->log, sc->lvl, &sc->config->worker);
151
0
  if (!sw->worker) {
152
0
    PERROR("%s - Failed creating worker", worker_name);
153
0
    goto fail;
154
0
  }
155
156
  /*
157
   *  @todo make this a registry
158
   */
159
0
  if (sc->worker_thread_instantiate) {
160
0
    CONF_SECTION  *cs;
161
0
    char    section_name[32];
162
163
0
    snprintf(section_name, sizeof(section_name), "%u", sw->thread.id);
164
165
0
    cs = cf_section_find(sc->cs, "worker", section_name);
166
0
    if (!cs) cs = cf_section_find(sc->cs, "worker", NULL);
167
168
0
    if (sc->worker_thread_instantiate(sw->thread.ctx, sw->thread.el, cs) < 0) {
169
0
      PERROR("%s - Worker thread instantiation failed", worker_name);
170
0
      goto fail;
171
0
    }
172
0
  }
173
174
  /*
175
   *  Add this worker to all network threads.
176
   */
177
0
  fr_dlist_foreach(&sc->networks, fr_schedule_network_t, sn) {
178
0
    if (unlikely(fr_network_worker_add(sn->nr, sw->worker) < 0)) {
179
0
      PERROR("%s - Failed adding worker to network %u", worker_name, sn->thread.id);
180
0
      goto fail;  /* FIXME - Should maybe try to undo partial adds? */
181
0
    }
182
0
  }
183
184
  /*
185
   *  Tell the originator that the thread has started.
186
   */
187
0
  fr_thread_start(&sw->thread, sc->worker_sem);
188
189
  /*
190
   *  Do all of the work.
191
   */
192
0
  fr_worker(sw->worker);
193
194
0
  status = FR_THREAD_EXITED;
195
196
0
fail:
197
0
  if (sw->worker) {
198
0
    fr_worker_destroy(sw->worker);
199
0
    sw->worker = NULL;
200
0
  }
201
202
0
  if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); /* Fixme once we figure out what uctx should be */
203
204
0
  fr_thread_exit(&sw->thread, status, sc->worker_sem);
205
206
0
  return NULL;
207
0
}
208
209
210
static void stats_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
211
0
{
212
0
  fr_schedule_network_t   *sn = talloc_get_type_abort(uctx, fr_schedule_network_t);
213
214
0
  fr_network_stats_log(sn->nr, sn->sc->log);
215
216
0
  (void) fr_timer_at(sn, tl, &sn->ev, fr_time_add(now, sn->sc->config->stats_interval), false, stats_timer, sn);
217
0
}
218
219
/** Initialize and run the network thread.
220
 *
221
 * @param[in] arg the fr_schedule_network_t
222
 * @return NULL
223
 */
224
static void *fr_schedule_network_thread(void *arg)
225
0
{
226
0
  fr_schedule_network_t   *sn = talloc_get_type_abort(arg, fr_schedule_network_t);
227
0
  fr_schedule_t     *sc = sn->sc;
228
0
  fr_thread_status_t    status = FR_THREAD_FAIL;
229
0
  char        network_name[32];
230
231
0
  snprintf(network_name, sizeof(network_name), "Network %d", sn->thread.id);
232
233
0
#ifdef HAVE_PTHREAD_SETNAME_NP
234
#  ifdef __APPLE__
235
  pthread_setname_np(network_name);
236
#  else
237
0
  pthread_setname_np(pthread_self(), network_name);
238
0
#  endif
239
0
#endif
240
241
0
  if (fr_thread_setup(&sn->thread, network_name) < 0) goto fail;
242
243
0
  sn->nr = fr_network_create(sn->thread.ctx, sn->thread.el, network_name, sc->log, sc->lvl, &sc->config->network);
244
0
  if (!sn->nr) {
245
0
    PERROR("%s - Failed creating network", network_name);
246
0
    goto fail;
247
0
  }
248
249
  /*
250
   *  Tell the originator that the thread has started.
251
   */
252
0
  fr_thread_start(&sn->thread, sc->network_sem);
253
254
  /*
255
   *  Print out statistics for this network IO handler.
256
   */
257
0
  if (fr_time_delta_ispos(sc->config->stats_interval)) {
258
0
    (void) fr_timer_in(sn, sn->thread.el->tl, &sn->ev, sn->sc->config->stats_interval, false, stats_timer, sn);
259
0
  }
260
261
  /*
262
   *  Call the main event processing loop of the network
263
   *  thread Will not return until the worker is about
264
   *      to exit.
265
   */
266
0
  fr_network(sn->nr);
267
268
0
  status = FR_THREAD_EXITED;
269
270
0
fail:
271
0
  fr_thread_exit(&sn->thread, status, sc->network_sem);
272
273
0
  return NULL;
274
0
}
275
276
/** Create a scheduler and spawn the child threads.
277
 *
278
 * @param[in] ctx       talloc context.
279
 * @param[in] single_threaded     no workers are spawned, everything runs in a common event loop.
280
 * @param[in] el        event list, only for single-threaded mode.
281
 * @param[in] logger        destination for all logging messages.
282
 * @param[in] lvl       log level.
283
 * @param[in] worker_thread_instantiate   callback for new worker threads.
284
 * @param[in] worker_thread_detach    callback to destroy resources
285
 *            allocated by worker_thread_instantiate.
286
 * @param[in] config        configuration for the scheduler
287
 * @return
288
 *  - NULL on error
289
 *  - fr_schedule_t new scheduler
290
 */
291
fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx,
292
          bool single_threaded,
293
          fr_event_list_t *el,
294
          fr_log_t *logger, fr_log_lvl_t lvl,
295
          fr_schedule_thread_instantiate_t worker_thread_instantiate,
296
          fr_schedule_thread_detach_t worker_thread_detach,
297
          fr_schedule_config_t *config)
298
0
{
299
0
  unsigned int i;
300
0
  fr_schedule_worker_t *sw, *next_sw;
301
0
  fr_schedule_network_t *sn, *next_sn;
302
0
  fr_schedule_t *sc;
303
304
0
  sc = talloc_zero(ctx, fr_schedule_t);
305
0
  if (!sc) {
306
0
    fr_strerror_const("Failed allocating memory");
307
0
    return NULL;
308
0
  }
309
310
  /*
311
   *  Parse any scheduler-specific configuration.
312
   */
313
0
  if (!config) {
314
0
    MEM(sc->config = talloc_zero(sc, fr_schedule_config_t));
315
0
    sc->config->max_networks = 1;
316
0
    sc->config->max_workers = 4;
317
0
  } else {
318
0
    sc->config = config;
319
320
0
    if (sc->config->max_networks < 1) sc->config->max_networks = 1;
321
0
    if (sc->config->max_networks > 64) sc->config->max_networks = 64;
322
0
    if (sc->config->max_workers < 1) sc->config->max_workers = 1;
323
0
    if (sc->config->max_workers > 64) sc->config->max_workers = 64;
324
0
  }
325
326
0
  sc->el = el;
327
0
  sc->single_threaded = single_threaded;
328
0
  sc->log = logger;
329
0
  sc->lvl = lvl;
330
0
  sc->cs = sc->config->cs;
331
332
0
  sc->worker_thread_instantiate = worker_thread_instantiate;
333
0
  sc->worker_thread_detach = worker_thread_detach;
334
0
  sc->running = true;
335
336
  /*
337
   *  If we're single-threaded, create network / worker, and insert them into the event loop.
338
   */
339
0
  if (single_threaded) {
340
0
    sc->single_network = fr_network_create(sc, el, "Network", sc->log, sc->lvl, &sc->config->network);
341
0
    if (!sc->single_network) {
342
0
      PERROR("Failed creating network");
343
0
    pre_instantiate_st_fail:
344
0
      talloc_free(sc);
345
0
      return NULL;
346
0
    }
347
348
0
    if (fr_coords_create(sc, el) < 0) {
349
0
      PERROR("Failed creating coordinators");
350
0
      if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
351
0
        PERROR("Failed destroying network");
352
0
      }
353
0
      goto pre_instantiate_st_fail;
354
0
    }
355
356
0
    worker_id = 0;
357
0
    sc->single_worker = fr_worker_alloc(sc, el, "Worker", sc->log, sc->lvl, &sc->config->worker);
358
0
    if (!sc->single_worker) {
359
0
      PERROR("Failed creating worker");
360
0
      if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
361
0
        PERROR("Failed destroying network");
362
0
      }
363
0
      goto pre_instantiate_st_fail;
364
0
    }
365
366
    /*
367
     *  Parent thread-specific data from the single_worker
368
     */
369
0
    if (sc->worker_thread_instantiate) {
370
0
      CONF_SECTION *subcs;
371
372
0
      subcs = cf_section_find(sc->cs, "worker", "0");
373
0
      if (!subcs) subcs = cf_section_find(sc->cs, "worker", NULL);
374
375
0
      if (sc->worker_thread_instantiate(sc->single_worker, el, subcs) < 0) {
376
0
        PERROR("Worker thread instantiation failed");
377
0
      destroy_both:
378
0
        if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
379
0
          PERROR("Failed destroying network");
380
0
        }
381
0
        fr_worker_destroy(sc->single_worker);
382
0
        goto pre_instantiate_st_fail;
383
0
      }
384
0
    }
385
386
0
    if (fr_command_register_hook(NULL, "0", sc->single_worker, cmd_worker_table) < 0) {
387
0
      PERROR("Failed adding worker commands");
388
0
    st_fail:
389
0
      if (sc->worker_thread_detach) sc->worker_thread_detach(NULL);
390
0
      goto destroy_both;
391
0
    }
392
393
0
    if (fr_command_register_hook(NULL, "0", sc->single_network, cmd_network_table) < 0) {
394
0
      PERROR("Failed adding network commands");
395
0
      goto st_fail;
396
0
    }
397
398
    /*
399
     *  Register the worker with the network, so
400
     *  things like fr_network_send_request() work.
401
     */
402
0
    fr_network_worker_add_self(sc->single_network, sc->single_worker);
403
0
    DEBUG("Scheduler created in single-threaded mode");
404
405
0
    if (fr_event_pre_insert(el, fr_worker_pre_event, sc->single_worker) < 0) {
406
0
      fr_strerror_const("Failed adding pre-check to event list");
407
0
      goto st_fail;
408
0
    }
409
410
0
    if (fr_coord_pre_event_insert(el) < 0) {
411
0
      fr_strerror_const("Failed adding coordinator pre-check to event list");
412
0
      goto st_fail;
413
0
    }
414
415
    /*
416
     *  Add the event which processes request_t packets.
417
     */
418
0
    if (fr_event_post_insert(el, fr_worker_post_event, sc->single_worker) < 0) {
419
0
      fr_strerror_const("Failed inserting post-processing event");
420
0
      goto st_fail;
421
0
    }
422
423
0
    if (fr_coord_post_event_insert(el) < 0) {
424
0
      fr_strerror_const("Failed adding coordinator post-processing to event list");
425
0
      goto st_fail;
426
0
    }
427
428
0
    return sc;
429
0
  }
430
431
  /*
432
   *  Create the lists which hold the workers and networks.
433
   */
434
0
  fr_dlist_init(&sc->workers, fr_schedule_worker_t, thread.entry);
435
0
  fr_dlist_init(&sc->networks, fr_schedule_network_t, thread.entry);
436
437
0
  sc->network_sem = fr_sem_alloc();
438
0
  if (!sc->network_sem) {
439
0
  sem_fail:
440
0
    ERROR("Failed creating semaphore: %s", fr_syserror(errno));
441
0
    fr_sem_free(sc->network_sem);
442
0
    fr_sem_free(sc->worker_sem);
443
0
    talloc_free(sc);
444
0
    return NULL;
445
0
  }
446
447
0
  sc->worker_sem = fr_sem_alloc();
448
0
  if (!sc->worker_sem) goto sem_fail;
449
450
0
  sc->coord_sem = fr_sem_alloc();
451
0
  if (!sc->coord_sem) goto sem_fail;
452
453
  /*
454
   *  Create the network threads first.
455
   */
456
0
  for (i = 0; i < sc->config->max_networks; i++) {
457
0
    DEBUG3("Creating %u/%u networks", i + 1, sc->config->max_networks);
458
459
    /*
460
     *  Create a worker "glue" structure
461
     */
462
0
    sn = talloc_zero(sc, fr_schedule_network_t);
463
0
    if (!sn) {
464
0
      ERROR("Network %u - Failed allocating memory", i);
465
0
      break;
466
0
    }
467
468
0
    sn->thread.id = i;
469
0
    sn->sc = sc;
470
0
    sn->thread.status = FR_THREAD_INITIALIZING;
471
472
0
    if (fr_thread_create(&sn->thread.pthread_id, fr_schedule_network_thread, sn) < 0) {
473
0
      talloc_free(sn);
474
0
      PERROR("Failed creating network %u", i);
475
0
      break;
476
0
    }
477
478
0
    fr_dlist_insert_head(&sc->networks, sn);
479
0
  }
480
481
  /*
482
   *  Wait for all of the networks to signal us that either
483
   *  they've started, OR there's been a problem and they
484
   *  can't start.
485
   */
486
0
  if (fr_thread_wait_list(sc->network_sem, &sc->networks) < 0) {
487
0
    fr_schedule_destroy(&sc);
488
0
    return NULL;
489
0
  }
490
491
  /*
492
   *  Create the coordination threads
493
   */
494
0
  if (fr_coord_start(sc->config->max_workers, sc->coord_sem) < 0) {
495
0
    fr_schedule_destroy(&sc);
496
0
    return NULL;
497
0
  };
498
499
  /*
500
   *  Create all of the workers.
501
   */
502
0
  for (i = 0; i < sc->config->max_workers; i++) {
503
0
    DEBUG3("Creating %u/%u workers", i + 1, sc->config->max_workers);
504
505
    /*
506
     *  Create a worker "glue" structure
507
     */
508
0
    sw = talloc_zero(sc, fr_schedule_worker_t);
509
0
    if (!sw) {
510
0
      ERROR("Worker %u - Failed allocating memory", i);
511
0
      break;
512
0
    }
513
514
0
    sw->thread.id = i;
515
0
    sw->sc = sc;
516
0
    sw->thread.status = FR_THREAD_INITIALIZING;
517
518
0
    if (fr_thread_create(&sw->thread.pthread_id, fr_schedule_worker_thread, sw) < 0) {
519
0
      talloc_free(sw);
520
0
      PERROR("Failed creating worker %u", i);
521
0
      break;
522
0
    }
523
524
0
    fr_dlist_insert_head(&sc->workers, sw);
525
0
  }
526
527
  /*
528
   *  Wait for all of the workers to signal us that either
529
   *  they've started, OR there's been a problem and they
530
   *  can't start.
531
   */
532
0
  if (fr_thread_wait_list(sc->worker_sem, &sc->workers) < 0) {
533
0
    fr_schedule_destroy(&sc);
534
0
    return NULL;
535
0
  }
536
537
0
  for (sw = fr_dlist_head(&sc->workers), i = 0;
538
0
       sw != NULL;
539
0
       sw = next_sw, i++) {
540
0
    char buffer[32];
541
542
0
    next_sw = fr_dlist_next(&sc->workers, sw);
543
544
0
    snprintf(buffer, sizeof(buffer), "%d", i);
545
0
    if (fr_command_register_hook(NULL, buffer, sw->worker, cmd_worker_table) < 0) {
546
0
      PERROR("Failed adding worker commands");
547
0
    mt_fail:
548
0
      fr_schedule_destroy(&sc);
549
0
      return NULL;
550
0
    }
551
0
  }
552
553
0
  for (sn = fr_dlist_head(&sc->networks), i = 0;
554
0
       sn != NULL;
555
0
       sn = next_sn, i++) {
556
0
    char buffer[32];
557
558
0
    next_sn = fr_dlist_next(&sc->networks, sn);
559
560
0
    snprintf(buffer, sizeof(buffer), "%d", i);
561
0
    if (fr_command_register_hook(NULL, buffer, sn->nr, cmd_network_table) < 0) {
562
0
      PERROR("Failed adding network commands");
563
0
      goto mt_fail;
564
0
    }
565
0
  }
566
567
0
  if (sc) INFO("Scheduler created successfully with %u networks and %u workers",
568
0
         sc->config->max_networks, (unsigned int)fr_dlist_num_elements(&sc->workers));
569
570
  /*
571
   *  Instantiate thread-local data for the main thread too.
572
   *  In single-threaded mode this is done above.  In
573
   *  multi-worker mode the main thread also needs module
574
   *  thread data so that triggers can use module xlats.
575
   */
576
0
  if (sc->worker_thread_instantiate &&
577
0
      unlikely((sc->worker_thread_instantiate(sc, el, NULL) < 0))) {
578
0
    PERROR("Main thread instantiation failed");
579
0
    goto mt_fail;
580
0
  }
581
582
0
  return sc;
583
0
}
584
585
/** Destroy a scheduler, and tell its child threads to exit.
586
 *
587
 * @note This may be called with no worker or network threads in the case of a
588
 *   instantiation error.  This function _should_ deal with that condition
589
 *   gracefully.
590
 *
591
 * @param[in] sc_to_free the scheduler
592
 * @return
593
 *  - <0 on error
594
 *  - 0 on success
595
 */
596
int fr_schedule_destroy(fr_schedule_t **sc_to_free)
597
0
{
598
0
  fr_schedule_t   *sc = *sc_to_free;
599
0
  unsigned int    i;
600
0
  fr_schedule_worker_t  *sw;
601
0
  fr_schedule_network_t *sn;
602
0
  int     ret;
603
604
0
  if (!sc) return 0;
605
606
0
  sc->running = false;
607
608
609
610
  /*
611
   *  Single threaded mode: kill the only network / worker we have.
612
   */
613
0
  if (sc->single_threaded) {
614
    /*
615
     *  Destroy the network side first.  It tells the
616
     *  workers to close.
617
     */
618
0
    if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
619
0
      ERROR("Failed destroying network");
620
0
    }
621
0
    fr_worker_destroy(sc->single_worker);
622
0
    fr_coords_destroy();
623
624
0
    goto done;
625
0
  } else {
626
    /*
627
     *  Detach thread-local data for the main thread.
628
     *  Worker threads handle their own detach, but
629
     *  the main thread was instantiated explicitly
630
     *  by fr_schedule_create.
631
     */
632
0
    if (sc->worker_thread_detach) sc->worker_thread_detach(NULL);
633
0
  }
634
635
  /*
636
   *  Signal each network thread to exit.
637
   */
638
0
  fr_dlist_foreach(&sc->networks, fr_schedule_network_t, sne) {
639
0
    if (fr_network_exit(sne->nr) < 0) {
640
0
      PERROR("Failed signaling network %i to exit", sne->thread.id);
641
0
    }
642
0
  }
643
644
  /*
645
   *  If the network threads are running, tell them to exit,
646
   *  and wait for them to do so.  Each network thread tells
647
   *  all of its worker threads that it's exiting.  It then
648
   *  closes the channels.  When the workers see that there
649
   *  are no input channels, they exit, too.
650
   */
651
0
  for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->networks); i++) {
652
0
    DEBUG2("Scheduler - Waiting for semaphore indicating network exit %u/%u", i + 1,
653
0
           (unsigned int)fr_dlist_num_elements(&sc->networks));
654
0
    SEM_WAIT_INTR(sc->network_sem);
655
0
  }
656
0
  DEBUG2("Scheduler - All networks indicated exit complete");
657
658
0
  while ((sn = fr_dlist_pop_head(&sc->networks)) != NULL) {
659
    /*
660
     *  Ensure that the thread has exited before
661
     *  cleaning up the context.
662
     *
663
     *  This also ensures that the child threads have
664
     *  exited before the main thread cleans up the
665
     *  module instances.
666
     */
667
0
    if ((ret = pthread_join(sn->thread.pthread_id, NULL)) != 0) {
668
0
      ERROR("Failed joining network %i: %s", sn->thread.id, fr_syserror(ret));
669
0
    } else {
670
0
      DEBUG2("Network %i joined (cleaned up)", sn->thread.id);
671
0
    }
672
0
  }
673
674
  /*
675
   *  Wait for all worker threads to finish.  THEN clean up
676
   *  modules.  Otherwise, the modules will be removed from
677
   *  underneath the workers!
678
   */
679
0
  for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->workers); i++) {
680
0
    DEBUG2("Scheduler - Waiting for semaphore indicating worker exit %u/%u", i + 1,
681
0
           (unsigned int)fr_dlist_num_elements(&sc->workers));
682
0
    SEM_WAIT_INTR(sc->worker_sem);
683
0
  }
684
0
  DEBUG2("Scheduler - All workers indicated exit complete");
685
686
  /*
687
   *  Clean up the exited workers.
688
   */
689
0
  while ((sw = fr_dlist_pop_head(&sc->workers)) != NULL) {
690
    /*
691
     *  Ensure that the thread has exited before
692
     *  cleaning up the context.
693
     *
694
     *  This also ensures that the child threads have
695
     *  exited before the main thread cleans up the
696
     *  module instances.
697
     */
698
0
    if ((ret = pthread_join(sw->thread.pthread_id, NULL)) != 0) {
699
0
      ERROR("Failed joining worker %i: %s", sw->thread.id, fr_syserror(ret));
700
0
    } else {
701
0
      DEBUG2("Worker %i joined (cleaned up)", sw->thread.id);
702
0
    }
703
0
  }
704
705
0
  fr_coord_thread_join();
706
707
0
  fr_sem_free(sc->coord_sem);
708
0
  fr_sem_free(sc->network_sem);
709
0
  fr_sem_free(sc->worker_sem);
710
711
0
done:
712
  /*
713
   *  Now that all of the workers are done, we can return to
714
   *  the caller, and have it dlclose() the modules.
715
   */
716
0
  talloc_free(sc);
717
0
  *sc_to_free = NULL;
718
719
0
  return 0;
720
0
}
721
722
/** Add a fr_listen_t to a scheduler.
723
 *
724
 * @param[in] sc the scheduler
725
 * @param[in] li the ctx and callbacks for the transport.
726
 * @return
727
 *  - NULL on error
728
 *  - the fr_network_t that the socket was added to.
729
 */
730
fr_network_t *fr_schedule_listen_add(fr_schedule_t *sc, fr_listen_t *li)
731
0
{
732
0
  fr_network_t *nr;
733
734
0
  (void) talloc_get_type_abort(sc, fr_schedule_t);
735
736
0
  if (sc->single_threaded) {
737
0
    nr = sc->single_network;
738
0
  } else {
739
0
    fr_schedule_network_t *sn;
740
741
    /*
742
     *  @todo - round robin it among the listeners?
743
     *  or maybe add it to the same parent thread?
744
     */
745
0
    sn = fr_dlist_head(&sc->networks);
746
0
    nr = sn->nr;
747
0
  }
748
749
0
  if (fr_network_listen_add(nr, li) < 0) return NULL;
750
751
0
  return nr;
752
0
}
753
754
/** Add a directory NOTE_EXTEND to a scheduler.
755
 *
756
 * @param[in] sc the scheduler
757
 * @param[in] li the ctx and callbacks for the transport.
758
 * @return
759
 *  - NULL on error
760
 *  - the fr_network_t that the socket was added to.
761
 */
762
fr_network_t *fr_schedule_directory_add(fr_schedule_t *sc, fr_listen_t *li)
763
0
{
764
0
  fr_network_t *nr;
765
766
0
  (void) talloc_get_type_abort(sc, fr_schedule_t);
767
768
0
  if (sc->single_threaded) {
769
0
    nr = sc->single_network;
770
0
  } else {
771
0
    fr_schedule_network_t *sn;
772
773
    /*
774
     *  @todo - round robin it among the listeners?
775
     *  or maybe add it to the same parent thread?
776
     */
777
0
    sn = fr_dlist_head(&sc->networks);
778
0
    nr = sn->nr;
779
0
  }
780
781
0
  if (fr_network_directory_add(nr, li) < 0) return NULL;
782
783
0
  return nr;
784
0
}