/src/freeradius-server/src/lib/io/schedule.c
Line | Count | Source |
1 | | /* |
2 | | * This program is free software; you can redistribute it and/or modify |
3 | | * it under the terms of the GNU General Public License as published by |
4 | | * the Free Software Foundation; either version 2 of the License, or |
5 | | * (at your option) any later version. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA |
15 | | */ |
16 | | |
17 | | /** |
18 | | * $Id: 4c6ba355f46cf1d16003fedfdaa5a02702d293dc $ |
19 | | * |
20 | | * @brief Network / worker thread scheduling |
21 | | * @file io/schedule.c |
22 | | * |
23 | | * @copyright 2016 Alan DeKok (aland@freeradius.org) |
24 | | */ |
25 | | RCSID("$Id: 4c6ba355f46cf1d16003fedfdaa5a02702d293dc $") |
26 | | |
27 | 0 | #define LOG_DST sc->log |
28 | | |
29 | | #include <freeradius-devel/autoconf.h> |
30 | | |
31 | | #include <freeradius-devel/io/schedule.h> |
32 | | #include <freeradius-devel/io/thread.h> |
33 | | #include <freeradius-devel/util/dlist.h> |
34 | | #include <freeradius-devel/util/rb.h> |
35 | | #include <freeradius-devel/util/syserror.h> |
36 | | #include <freeradius-devel/server/trigger.h> |
37 | | #include <freeradius-devel/util/semaphore.h> |
38 | | |
39 | | #include <pthread.h> |
40 | | |
41 | | /** Scheduler specific information for worker threads |
42 | | * |
43 | | * Wraps a fr_worker_t, tracking additional information that |
44 | | * the scheduler uses. |
45 | | */ |
46 | | typedef struct { |
47 | | fr_thread_t thread; //!< common thread structure - must be first! |
48 | | |
49 | | int uses; //!< how many network threads are using it |
50 | | fr_time_t cpu_time; //!< how much CPU time this worker has used |
51 | | |
52 | | fr_schedule_t *sc; //!< the scheduler we are running under |
53 | | |
54 | | fr_worker_t *worker; //!< the worker data structure |
55 | | } fr_schedule_worker_t; |
56 | | |
57 | | /** Scheduler specific information for network threads |
58 | | * |
59 | | * Wraps a fr_network_t, tracking additional information that |
60 | | * the scheduler uses. |
61 | | */ |
62 | | typedef struct { |
63 | | fr_thread_t thread; //!< common thread structure - must be first! |
64 | | |
65 | | fr_schedule_t *sc; //!< the scheduler we are running under |
66 | | |
67 | | fr_network_t *nr; //!< the receive data structure |
68 | | |
69 | | fr_timer_t *ev; //!< timer for stats_interval |
70 | | } fr_schedule_network_t; |
71 | | |
72 | | |
73 | | /** |
74 | | * The scheduler |
75 | | */ |
76 | | struct fr_schedule_s { |
77 | | bool running; //!< is the scheduler running? |
78 | | |
79 | | CONF_SECTION *cs; //!< thread pool configuration section |
80 | | fr_event_list_t *el; //!< event list for single-threaded mode. |
81 | | bool single_threaded; //!< true if running in single-threaded mode. |
82 | | |
83 | | fr_log_t *log; //!< log destination |
84 | | fr_log_lvl_t lvl; //!< log level |
85 | | |
86 | | fr_schedule_config_t *config; //!< configuration |
87 | | |
88 | | unsigned int num_workers_exited; //!< number of exited workers |
89 | | |
90 | | fr_sem_t *worker_sem; //!< for inter-thread signaling |
91 | | fr_sem_t *network_sem; //!< for inter-thread signaling |
92 | | fr_sem_t *coord_sem; //!< for inter-thread signaling |
93 | | |
94 | | fr_schedule_thread_instantiate_t worker_thread_instantiate; //!< thread instantiation callback |
95 | | fr_schedule_thread_detach_t worker_thread_detach; |
96 | | |
97 | | fr_dlist_head_t workers; //!< list of workers |
98 | | fr_dlist_head_t networks; //!< list of networks |
99 | | |
100 | | fr_network_t *single_network; //!< for single-threaded mode |
101 | | fr_worker_t *single_worker; //!< for single-threaded mode |
102 | | }; |
103 | | |
104 | | static _Thread_local int worker_id = -1; //!< Internal ID of the current worker thread. |
105 | | |
106 | | /** Return the worker id for the current thread |
107 | | * |
108 | | * @return worker ID |
109 | | */ |
110 | | int fr_schedule_worker_id(void) |
111 | 0 | { |
112 | 0 | return worker_id; |
113 | 0 | } |
114 | | |
115 | | /** Explicitly set the worker id for the current thread |
116 | | * |
117 | | * **Only to be used in test programs like unit_test_module** |
118 | | */ |
119 | | void fr_schedule_worker_id_set(int id) |
120 | 0 | { |
121 | 0 | worker_id = id; |
122 | 0 | } |
123 | | |
124 | | /** Entry point for worker threads |
125 | | * |
126 | | * @param[in] arg the fr_schedule_worker_t |
127 | | * @return NULL |
128 | | */ |
129 | | static void *fr_schedule_worker_thread(void *arg) |
130 | 0 | { |
131 | 0 | fr_schedule_worker_t *sw = talloc_get_type_abort(arg, fr_schedule_worker_t); |
132 | 0 | fr_schedule_t *sc = sw->sc; |
133 | 0 | fr_thread_status_t status = FR_THREAD_FAIL; |
134 | 0 | char worker_name[32]; |
135 | |
|
136 | 0 | worker_id = sw->thread.id; /* Store the current worker ID */ |
137 | |
|
138 | 0 | snprintf(worker_name, sizeof(worker_name), "Worker %d", sw->thread.id); |
139 | |
|
140 | 0 | #ifdef HAVE_PTHREAD_SETNAME_NP |
141 | | # ifdef __APPLE__ |
142 | | pthread_setname_np(worker_name); |
143 | | # else |
144 | 0 | pthread_setname_np(pthread_self(), worker_name); |
145 | 0 | # endif |
146 | 0 | #endif |
147 | |
|
148 | 0 | if (fr_thread_setup(&sw->thread, worker_name) < 0) goto fail; |
149 | | |
150 | 0 | sw->worker = fr_worker_alloc(sw->thread.ctx, sw->thread.el, worker_name, sc->log, sc->lvl, &sc->config->worker); |
151 | 0 | if (!sw->worker) { |
152 | 0 | PERROR("%s - Failed creating worker", worker_name); |
153 | 0 | goto fail; |
154 | 0 | } |
155 | | |
156 | | /* |
157 | | * @todo make this a registry |
158 | | */ |
159 | 0 | if (sc->worker_thread_instantiate) { |
160 | 0 | CONF_SECTION *cs; |
161 | 0 | char section_name[32]; |
162 | |
|
163 | 0 | snprintf(section_name, sizeof(section_name), "%u", sw->thread.id); |
164 | |
|
165 | 0 | cs = cf_section_find(sc->cs, "worker", section_name); |
166 | 0 | if (!cs) cs = cf_section_find(sc->cs, "worker", NULL); |
167 | |
|
168 | 0 | if (sc->worker_thread_instantiate(sw->thread.ctx, sw->thread.el, cs) < 0) { |
169 | 0 | PERROR("%s - Worker thread instantiation failed", worker_name); |
170 | 0 | goto fail; |
171 | 0 | } |
172 | 0 | } |
173 | | |
174 | | /* |
175 | | * Add this worker to all network threads. |
176 | | */ |
177 | 0 | fr_dlist_foreach(&sc->networks, fr_schedule_network_t, sn) { |
178 | 0 | if (unlikely(fr_network_worker_add(sn->nr, sw->worker) < 0)) { |
179 | 0 | PERROR("%s - Failed adding worker to network %u", worker_name, sn->thread.id); |
180 | 0 | goto fail; /* FIXME - Should maybe try to undo partial adds? */ |
181 | 0 | } |
182 | 0 | } |
183 | | |
184 | | /* |
185 | | * Tell the originator that the thread has started. |
186 | | */ |
187 | 0 | fr_thread_start(&sw->thread, sc->worker_sem); |
188 | | |
189 | | /* |
190 | | * Do all of the work. |
191 | | */ |
192 | 0 | fr_worker(sw->worker); |
193 | |
|
194 | 0 | status = FR_THREAD_EXITED; |
195 | |
|
196 | 0 | fail: |
197 | 0 | if (sw->worker) { |
198 | 0 | fr_worker_destroy(sw->worker); |
199 | 0 | sw->worker = NULL; |
200 | 0 | } |
201 | |
|
202 | 0 | if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); /* Fixme once we figure out what uctx should be */ |
203 | |
|
204 | 0 | fr_thread_exit(&sw->thread, status, sc->worker_sem); |
205 | |
|
206 | 0 | return NULL; |
207 | 0 | } |
208 | | |
209 | | |
210 | | static void stats_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx) |
211 | 0 | { |
212 | 0 | fr_schedule_network_t *sn = talloc_get_type_abort(uctx, fr_schedule_network_t); |
213 | |
|
214 | 0 | fr_network_stats_log(sn->nr, sn->sc->log); |
215 | |
|
216 | 0 | (void) fr_timer_at(sn, tl, &sn->ev, fr_time_add(now, sn->sc->config->stats_interval), false, stats_timer, sn); |
217 | 0 | } |
218 | | |
219 | | /** Initialize and run the network thread. |
220 | | * |
221 | | * @param[in] arg the fr_schedule_network_t |
222 | | * @return NULL |
223 | | */ |
224 | | static void *fr_schedule_network_thread(void *arg) |
225 | 0 | { |
226 | 0 | fr_schedule_network_t *sn = talloc_get_type_abort(arg, fr_schedule_network_t); |
227 | 0 | fr_schedule_t *sc = sn->sc; |
228 | 0 | fr_thread_status_t status = FR_THREAD_FAIL; |
229 | 0 | char network_name[32]; |
230 | |
|
231 | 0 | snprintf(network_name, sizeof(network_name), "Network %d", sn->thread.id); |
232 | |
|
233 | 0 | #ifdef HAVE_PTHREAD_SETNAME_NP |
234 | | # ifdef __APPLE__ |
235 | | pthread_setname_np(network_name); |
236 | | # else |
237 | 0 | pthread_setname_np(pthread_self(), network_name); |
238 | 0 | # endif |
239 | 0 | #endif |
240 | |
|
241 | 0 | if (fr_thread_setup(&sn->thread, network_name) < 0) goto fail; |
242 | | |
243 | 0 | sn->nr = fr_network_create(sn->thread.ctx, sn->thread.el, network_name, sc->log, sc->lvl, &sc->config->network); |
244 | 0 | if (!sn->nr) { |
245 | 0 | PERROR("%s - Failed creating network", network_name); |
246 | 0 | goto fail; |
247 | 0 | } |
248 | | |
249 | | /* |
250 | | * Tell the originator that the thread has started. |
251 | | */ |
252 | 0 | fr_thread_start(&sn->thread, sc->network_sem); |
253 | | |
254 | | /* |
255 | | * Print out statistics for this network IO handler. |
256 | | */ |
257 | 0 | if (fr_time_delta_ispos(sc->config->stats_interval)) { |
258 | 0 | (void) fr_timer_in(sn, sn->thread.el->tl, &sn->ev, sn->sc->config->stats_interval, false, stats_timer, sn); |
259 | 0 | } |
260 | | |
261 | | /* |
262 | | * Call the main event processing loop of the network |
263 | | * thread Will not return until the worker is about |
264 | | * to exit. |
265 | | */ |
266 | 0 | fr_network(sn->nr); |
267 | |
|
268 | 0 | status = FR_THREAD_EXITED; |
269 | |
|
270 | 0 | fail: |
271 | 0 | fr_thread_exit(&sn->thread, status, sc->network_sem); |
272 | |
|
273 | 0 | return NULL; |
274 | 0 | } |
275 | | |
276 | | /** Create a scheduler and spawn the child threads. |
277 | | * |
278 | | * @param[in] ctx talloc context. |
279 | | * @param[in] single_threaded no workers are spawned, everything runs in a common event loop. |
280 | | * @param[in] el event list, only for single-threaded mode. |
281 | | * @param[in] logger destination for all logging messages. |
282 | | * @param[in] lvl log level. |
283 | | * @param[in] worker_thread_instantiate callback for new worker threads. |
284 | | * @param[in] worker_thread_detach callback to destroy resources |
285 | | * allocated by worker_thread_instantiate. |
286 | | * @param[in] config configuration for the scheduler |
287 | | * @return |
288 | | * - NULL on error |
289 | | * - fr_schedule_t new scheduler |
290 | | */ |
291 | | fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, |
292 | | bool single_threaded, |
293 | | fr_event_list_t *el, |
294 | | fr_log_t *logger, fr_log_lvl_t lvl, |
295 | | fr_schedule_thread_instantiate_t worker_thread_instantiate, |
296 | | fr_schedule_thread_detach_t worker_thread_detach, |
297 | | fr_schedule_config_t *config) |
298 | 0 | { |
299 | 0 | unsigned int i; |
300 | 0 | fr_schedule_worker_t *sw, *next_sw; |
301 | 0 | fr_schedule_network_t *sn, *next_sn; |
302 | 0 | fr_schedule_t *sc; |
303 | |
|
304 | 0 | sc = talloc_zero(ctx, fr_schedule_t); |
305 | 0 | if (!sc) { |
306 | 0 | fr_strerror_const("Failed allocating memory"); |
307 | 0 | return NULL; |
308 | 0 | } |
309 | | |
310 | | /* |
311 | | * Parse any scheduler-specific configuration. |
312 | | */ |
313 | 0 | if (!config) { |
314 | 0 | MEM(sc->config = talloc_zero(sc, fr_schedule_config_t)); |
315 | 0 | sc->config->max_networks = 1; |
316 | 0 | sc->config->max_workers = 4; |
317 | 0 | } else { |
318 | 0 | sc->config = config; |
319 | |
|
320 | 0 | if (sc->config->max_networks < 1) sc->config->max_networks = 1; |
321 | 0 | if (sc->config->max_networks > 64) sc->config->max_networks = 64; |
322 | 0 | if (sc->config->max_workers < 1) sc->config->max_workers = 1; |
323 | 0 | if (sc->config->max_workers > 64) sc->config->max_workers = 64; |
324 | 0 | } |
325 | |
|
326 | 0 | sc->el = el; |
327 | 0 | sc->single_threaded = single_threaded; |
328 | 0 | sc->log = logger; |
329 | 0 | sc->lvl = lvl; |
330 | 0 | sc->cs = sc->config->cs; |
331 | |
|
332 | 0 | sc->worker_thread_instantiate = worker_thread_instantiate; |
333 | 0 | sc->worker_thread_detach = worker_thread_detach; |
334 | 0 | sc->running = true; |
335 | | |
336 | | /* |
337 | | * If we're single-threaded, create network / worker, and insert them into the event loop. |
338 | | */ |
339 | 0 | if (single_threaded) { |
340 | 0 | sc->single_network = fr_network_create(sc, el, "Network", sc->log, sc->lvl, &sc->config->network); |
341 | 0 | if (!sc->single_network) { |
342 | 0 | PERROR("Failed creating network"); |
343 | 0 | pre_instantiate_st_fail: |
344 | 0 | talloc_free(sc); |
345 | 0 | return NULL; |
346 | 0 | } |
347 | | |
348 | 0 | if (fr_coords_create(sc, el) < 0) { |
349 | 0 | PERROR("Failed creating coordinators"); |
350 | 0 | if (unlikely(fr_network_destroy(sc->single_network) < 0)) { |
351 | 0 | PERROR("Failed destroying network"); |
352 | 0 | } |
353 | 0 | goto pre_instantiate_st_fail; |
354 | 0 | } |
355 | | |
356 | 0 | worker_id = 0; |
357 | 0 | sc->single_worker = fr_worker_alloc(sc, el, "Worker", sc->log, sc->lvl, &sc->config->worker); |
358 | 0 | if (!sc->single_worker) { |
359 | 0 | PERROR("Failed creating worker"); |
360 | 0 | if (unlikely(fr_network_destroy(sc->single_network) < 0)) { |
361 | 0 | PERROR("Failed destroying network"); |
362 | 0 | } |
363 | 0 | goto pre_instantiate_st_fail; |
364 | 0 | } |
365 | | |
366 | | /* |
367 | | * Parent thread-specific data from the single_worker |
368 | | */ |
369 | 0 | if (sc->worker_thread_instantiate) { |
370 | 0 | CONF_SECTION *subcs; |
371 | |
|
372 | 0 | subcs = cf_section_find(sc->cs, "worker", "0"); |
373 | 0 | if (!subcs) subcs = cf_section_find(sc->cs, "worker", NULL); |
374 | |
|
375 | 0 | if (sc->worker_thread_instantiate(sc->single_worker, el, subcs) < 0) { |
376 | 0 | PERROR("Worker thread instantiation failed"); |
377 | 0 | destroy_both: |
378 | 0 | if (unlikely(fr_network_destroy(sc->single_network) < 0)) { |
379 | 0 | PERROR("Failed destroying network"); |
380 | 0 | } |
381 | 0 | fr_worker_destroy(sc->single_worker); |
382 | 0 | goto pre_instantiate_st_fail; |
383 | 0 | } |
384 | 0 | } |
385 | | |
386 | 0 | if (fr_command_register_hook(NULL, "0", sc->single_worker, cmd_worker_table) < 0) { |
387 | 0 | PERROR("Failed adding worker commands"); |
388 | 0 | st_fail: |
389 | 0 | if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); |
390 | 0 | goto destroy_both; |
391 | 0 | } |
392 | | |
393 | 0 | if (fr_command_register_hook(NULL, "0", sc->single_network, cmd_network_table) < 0) { |
394 | 0 | PERROR("Failed adding network commands"); |
395 | 0 | goto st_fail; |
396 | 0 | } |
397 | | |
398 | | /* |
399 | | * Register the worker with the network, so |
400 | | * things like fr_network_send_request() work. |
401 | | */ |
402 | 0 | fr_network_worker_add_self(sc->single_network, sc->single_worker); |
403 | 0 | DEBUG("Scheduler created in single-threaded mode"); |
404 | |
|
405 | 0 | if (fr_event_pre_insert(el, fr_worker_pre_event, sc->single_worker) < 0) { |
406 | 0 | fr_strerror_const("Failed adding pre-check to event list"); |
407 | 0 | goto st_fail; |
408 | 0 | } |
409 | | |
410 | 0 | if (fr_coord_pre_event_insert(el) < 0) { |
411 | 0 | fr_strerror_const("Failed adding coordinator pre-check to event list"); |
412 | 0 | goto st_fail; |
413 | 0 | } |
414 | | |
415 | | /* |
416 | | * Add the event which processes request_t packets. |
417 | | */ |
418 | 0 | if (fr_event_post_insert(el, fr_worker_post_event, sc->single_worker) < 0) { |
419 | 0 | fr_strerror_const("Failed inserting post-processing event"); |
420 | 0 | goto st_fail; |
421 | 0 | } |
422 | | |
423 | 0 | if (fr_coord_post_event_insert(el) < 0) { |
424 | 0 | fr_strerror_const("Failed adding coordinator post-processing to event list"); |
425 | 0 | goto st_fail; |
426 | 0 | } |
427 | | |
428 | 0 | return sc; |
429 | 0 | } |
430 | | |
431 | | /* |
432 | | * Create the lists which hold the workers and networks. |
433 | | */ |
434 | 0 | fr_dlist_init(&sc->workers, fr_schedule_worker_t, thread.entry); |
435 | 0 | fr_dlist_init(&sc->networks, fr_schedule_network_t, thread.entry); |
436 | |
|
437 | 0 | sc->network_sem = fr_sem_alloc(); |
438 | 0 | if (!sc->network_sem) { |
439 | 0 | sem_fail: |
440 | 0 | ERROR("Failed creating semaphore: %s", fr_syserror(errno)); |
441 | 0 | fr_sem_free(sc->network_sem); |
442 | 0 | fr_sem_free(sc->worker_sem); |
443 | 0 | talloc_free(sc); |
444 | 0 | return NULL; |
445 | 0 | } |
446 | | |
447 | 0 | sc->worker_sem = fr_sem_alloc(); |
448 | 0 | if (!sc->worker_sem) goto sem_fail; |
449 | | |
450 | 0 | sc->coord_sem = fr_sem_alloc(); |
451 | 0 | if (!sc->coord_sem) goto sem_fail; |
452 | | |
453 | | /* |
454 | | * Create the network threads first. |
455 | | */ |
456 | 0 | for (i = 0; i < sc->config->max_networks; i++) { |
457 | 0 | DEBUG3("Creating %u/%u networks", i + 1, sc->config->max_networks); |
458 | | |
459 | | /* |
460 | | * Create a worker "glue" structure |
461 | | */ |
462 | 0 | sn = talloc_zero(sc, fr_schedule_network_t); |
463 | 0 | if (!sn) { |
464 | 0 | ERROR("Network %u - Failed allocating memory", i); |
465 | 0 | break; |
466 | 0 | } |
467 | | |
468 | 0 | sn->thread.id = i; |
469 | 0 | sn->sc = sc; |
470 | 0 | sn->thread.status = FR_THREAD_INITIALIZING; |
471 | |
|
472 | 0 | if (fr_thread_create(&sn->thread.pthread_id, fr_schedule_network_thread, sn) < 0) { |
473 | 0 | talloc_free(sn); |
474 | 0 | PERROR("Failed creating network %u", i); |
475 | 0 | break; |
476 | 0 | } |
477 | | |
478 | 0 | fr_dlist_insert_head(&sc->networks, sn); |
479 | 0 | } |
480 | | |
481 | | /* |
482 | | * Wait for all of the networks to signal us that either |
483 | | * they've started, OR there's been a problem and they |
484 | | * can't start. |
485 | | */ |
486 | 0 | if (fr_thread_wait_list(sc->network_sem, &sc->networks) < 0) { |
487 | 0 | fr_schedule_destroy(&sc); |
488 | 0 | return NULL; |
489 | 0 | } |
490 | | |
491 | | /* |
492 | | * Create the coordination threads |
493 | | */ |
494 | 0 | if (fr_coord_start(sc->config->max_workers, sc->coord_sem) < 0) { |
495 | 0 | fr_schedule_destroy(&sc); |
496 | 0 | return NULL; |
497 | 0 | }; |
498 | | |
499 | | /* |
500 | | * Create all of the workers. |
501 | | */ |
502 | 0 | for (i = 0; i < sc->config->max_workers; i++) { |
503 | 0 | DEBUG3("Creating %u/%u workers", i + 1, sc->config->max_workers); |
504 | | |
505 | | /* |
506 | | * Create a worker "glue" structure |
507 | | */ |
508 | 0 | sw = talloc_zero(sc, fr_schedule_worker_t); |
509 | 0 | if (!sw) { |
510 | 0 | ERROR("Worker %u - Failed allocating memory", i); |
511 | 0 | break; |
512 | 0 | } |
513 | | |
514 | 0 | sw->thread.id = i; |
515 | 0 | sw->sc = sc; |
516 | 0 | sw->thread.status = FR_THREAD_INITIALIZING; |
517 | |
|
518 | 0 | if (fr_thread_create(&sw->thread.pthread_id, fr_schedule_worker_thread, sw) < 0) { |
519 | 0 | talloc_free(sw); |
520 | 0 | PERROR("Failed creating worker %u", i); |
521 | 0 | break; |
522 | 0 | } |
523 | | |
524 | 0 | fr_dlist_insert_head(&sc->workers, sw); |
525 | 0 | } |
526 | | |
527 | | /* |
528 | | * Wait for all of the workers to signal us that either |
529 | | * they've started, OR there's been a problem and they |
530 | | * can't start. |
531 | | */ |
532 | 0 | if (fr_thread_wait_list(sc->worker_sem, &sc->workers) < 0) { |
533 | 0 | fr_schedule_destroy(&sc); |
534 | 0 | return NULL; |
535 | 0 | } |
536 | | |
537 | 0 | for (sw = fr_dlist_head(&sc->workers), i = 0; |
538 | 0 | sw != NULL; |
539 | 0 | sw = next_sw, i++) { |
540 | 0 | char buffer[32]; |
541 | |
|
542 | 0 | next_sw = fr_dlist_next(&sc->workers, sw); |
543 | |
|
544 | 0 | snprintf(buffer, sizeof(buffer), "%d", i); |
545 | 0 | if (fr_command_register_hook(NULL, buffer, sw->worker, cmd_worker_table) < 0) { |
546 | 0 | PERROR("Failed adding worker commands"); |
547 | 0 | mt_fail: |
548 | 0 | fr_schedule_destroy(&sc); |
549 | 0 | return NULL; |
550 | 0 | } |
551 | 0 | } |
552 | | |
553 | 0 | for (sn = fr_dlist_head(&sc->networks), i = 0; |
554 | 0 | sn != NULL; |
555 | 0 | sn = next_sn, i++) { |
556 | 0 | char buffer[32]; |
557 | |
|
558 | 0 | next_sn = fr_dlist_next(&sc->networks, sn); |
559 | |
|
560 | 0 | snprintf(buffer, sizeof(buffer), "%d", i); |
561 | 0 | if (fr_command_register_hook(NULL, buffer, sn->nr, cmd_network_table) < 0) { |
562 | 0 | PERROR("Failed adding network commands"); |
563 | 0 | goto mt_fail; |
564 | 0 | } |
565 | 0 | } |
566 | | |
567 | 0 | if (sc) INFO("Scheduler created successfully with %u networks and %u workers", |
568 | 0 | sc->config->max_networks, (unsigned int)fr_dlist_num_elements(&sc->workers)); |
569 | | |
570 | | /* |
571 | | * Instantiate thread-local data for the main thread too. |
572 | | * In single-threaded mode this is done above. In |
573 | | * multi-worker mode the main thread also needs module |
574 | | * thread data so that triggers can use module xlats. |
575 | | */ |
576 | 0 | if (sc->worker_thread_instantiate && |
577 | 0 | unlikely((sc->worker_thread_instantiate(sc, el, NULL) < 0))) { |
578 | 0 | PERROR("Main thread instantiation failed"); |
579 | 0 | goto mt_fail; |
580 | 0 | } |
581 | | |
582 | 0 | return sc; |
583 | 0 | } |
584 | | |
585 | | /** Destroy a scheduler, and tell its child threads to exit. |
586 | | * |
587 | | * @note This may be called with no worker or network threads in the case of a |
588 | | * instantiation error. This function _should_ deal with that condition |
589 | | * gracefully. |
590 | | * |
591 | | * @param[in] sc_to_free the scheduler |
592 | | * @return |
593 | | * - <0 on error |
594 | | * - 0 on success |
595 | | */ |
596 | | int fr_schedule_destroy(fr_schedule_t **sc_to_free) |
597 | 0 | { |
598 | 0 | fr_schedule_t *sc = *sc_to_free; |
599 | 0 | unsigned int i; |
600 | 0 | fr_schedule_worker_t *sw; |
601 | 0 | fr_schedule_network_t *sn; |
602 | 0 | int ret; |
603 | |
|
604 | 0 | if (!sc) return 0; |
605 | | |
606 | 0 | sc->running = false; |
607 | | |
608 | | |
609 | | |
610 | | /* |
611 | | * Single threaded mode: kill the only network / worker we have. |
612 | | */ |
613 | 0 | if (sc->single_threaded) { |
614 | | /* |
615 | | * Destroy the network side first. It tells the |
616 | | * workers to close. |
617 | | */ |
618 | 0 | if (unlikely(fr_network_destroy(sc->single_network) < 0)) { |
619 | 0 | ERROR("Failed destroying network"); |
620 | 0 | } |
621 | 0 | fr_worker_destroy(sc->single_worker); |
622 | 0 | fr_coords_destroy(); |
623 | |
|
624 | 0 | goto done; |
625 | 0 | } else { |
626 | | /* |
627 | | * Detach thread-local data for the main thread. |
628 | | * Worker threads handle their own detach, but |
629 | | * the main thread was instantiated explicitly |
630 | | * by fr_schedule_create. |
631 | | */ |
632 | 0 | if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); |
633 | 0 | } |
634 | | |
635 | | /* |
636 | | * Signal each network thread to exit. |
637 | | */ |
638 | 0 | fr_dlist_foreach(&sc->networks, fr_schedule_network_t, sne) { |
639 | 0 | if (fr_network_exit(sne->nr) < 0) { |
640 | 0 | PERROR("Failed signaling network %i to exit", sne->thread.id); |
641 | 0 | } |
642 | 0 | } |
643 | | |
644 | | /* |
645 | | * If the network threads are running, tell them to exit, |
646 | | * and wait for them to do so. Each network thread tells |
647 | | * all of its worker threads that it's exiting. It then |
648 | | * closes the channels. When the workers see that there |
649 | | * are no input channels, they exit, too. |
650 | | */ |
651 | 0 | for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->networks); i++) { |
652 | 0 | DEBUG2("Scheduler - Waiting for semaphore indicating network exit %u/%u", i + 1, |
653 | 0 | (unsigned int)fr_dlist_num_elements(&sc->networks)); |
654 | 0 | SEM_WAIT_INTR(sc->network_sem); |
655 | 0 | } |
656 | 0 | DEBUG2("Scheduler - All networks indicated exit complete"); |
657 | |
|
658 | 0 | while ((sn = fr_dlist_pop_head(&sc->networks)) != NULL) { |
659 | | /* |
660 | | * Ensure that the thread has exited before |
661 | | * cleaning up the context. |
662 | | * |
663 | | * This also ensures that the child threads have |
664 | | * exited before the main thread cleans up the |
665 | | * module instances. |
666 | | */ |
667 | 0 | if ((ret = pthread_join(sn->thread.pthread_id, NULL)) != 0) { |
668 | 0 | ERROR("Failed joining network %i: %s", sn->thread.id, fr_syserror(ret)); |
669 | 0 | } else { |
670 | 0 | DEBUG2("Network %i joined (cleaned up)", sn->thread.id); |
671 | 0 | } |
672 | 0 | } |
673 | | |
674 | | /* |
675 | | * Wait for all worker threads to finish. THEN clean up |
676 | | * modules. Otherwise, the modules will be removed from |
677 | | * underneath the workers! |
678 | | */ |
679 | 0 | for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->workers); i++) { |
680 | 0 | DEBUG2("Scheduler - Waiting for semaphore indicating worker exit %u/%u", i + 1, |
681 | 0 | (unsigned int)fr_dlist_num_elements(&sc->workers)); |
682 | 0 | SEM_WAIT_INTR(sc->worker_sem); |
683 | 0 | } |
684 | 0 | DEBUG2("Scheduler - All workers indicated exit complete"); |
685 | | |
686 | | /* |
687 | | * Clean up the exited workers. |
688 | | */ |
689 | 0 | while ((sw = fr_dlist_pop_head(&sc->workers)) != NULL) { |
690 | | /* |
691 | | * Ensure that the thread has exited before |
692 | | * cleaning up the context. |
693 | | * |
694 | | * This also ensures that the child threads have |
695 | | * exited before the main thread cleans up the |
696 | | * module instances. |
697 | | */ |
698 | 0 | if ((ret = pthread_join(sw->thread.pthread_id, NULL)) != 0) { |
699 | 0 | ERROR("Failed joining worker %i: %s", sw->thread.id, fr_syserror(ret)); |
700 | 0 | } else { |
701 | 0 | DEBUG2("Worker %i joined (cleaned up)", sw->thread.id); |
702 | 0 | } |
703 | 0 | } |
704 | |
|
705 | 0 | fr_coord_thread_join(); |
706 | |
|
707 | 0 | fr_sem_free(sc->coord_sem); |
708 | 0 | fr_sem_free(sc->network_sem); |
709 | 0 | fr_sem_free(sc->worker_sem); |
710 | |
|
711 | 0 | done: |
712 | | /* |
713 | | * Now that all of the workers are done, we can return to |
714 | | * the caller, and have it dlclose() the modules. |
715 | | */ |
716 | 0 | talloc_free(sc); |
717 | 0 | *sc_to_free = NULL; |
718 | |
|
719 | 0 | return 0; |
720 | 0 | } |
721 | | |
722 | | /** Add a fr_listen_t to a scheduler. |
723 | | * |
724 | | * @param[in] sc the scheduler |
725 | | * @param[in] li the ctx and callbacks for the transport. |
726 | | * @return |
727 | | * - NULL on error |
728 | | * - the fr_network_t that the socket was added to. |
729 | | */ |
730 | | fr_network_t *fr_schedule_listen_add(fr_schedule_t *sc, fr_listen_t *li) |
731 | 0 | { |
732 | 0 | fr_network_t *nr; |
733 | |
|
734 | 0 | (void) talloc_get_type_abort(sc, fr_schedule_t); |
735 | |
|
736 | 0 | if (sc->single_threaded) { |
737 | 0 | nr = sc->single_network; |
738 | 0 | } else { |
739 | 0 | fr_schedule_network_t *sn; |
740 | | |
741 | | /* |
742 | | * @todo - round robin it among the listeners? |
743 | | * or maybe add it to the same parent thread? |
744 | | */ |
745 | 0 | sn = fr_dlist_head(&sc->networks); |
746 | 0 | nr = sn->nr; |
747 | 0 | } |
748 | |
|
749 | 0 | if (fr_network_listen_add(nr, li) < 0) return NULL; |
750 | | |
751 | 0 | return nr; |
752 | 0 | } |
753 | | |
754 | | /** Add a directory NOTE_EXTEND to a scheduler. |
755 | | * |
756 | | * @param[in] sc the scheduler |
757 | | * @param[in] li the ctx and callbacks for the transport. |
758 | | * @return |
759 | | * - NULL on error |
760 | | * - the fr_network_t that the socket was added to. |
761 | | */ |
762 | | fr_network_t *fr_schedule_directory_add(fr_schedule_t *sc, fr_listen_t *li) |
763 | 0 | { |
764 | 0 | fr_network_t *nr; |
765 | |
|
766 | 0 | (void) talloc_get_type_abort(sc, fr_schedule_t); |
767 | |
|
768 | 0 | if (sc->single_threaded) { |
769 | 0 | nr = sc->single_network; |
770 | 0 | } else { |
771 | 0 | fr_schedule_network_t *sn; |
772 | | |
773 | | /* |
774 | | * @todo - round robin it among the listeners? |
775 | | * or maybe add it to the same parent thread? |
776 | | */ |
777 | 0 | sn = fr_dlist_head(&sc->networks); |
778 | 0 | nr = sn->nr; |
779 | 0 | } |
780 | |
|
781 | 0 | if (fr_network_directory_add(nr, li) < 0) return NULL; |
782 | | |
783 | 0 | return nr; |
784 | 0 | } |