/src/freeradius-server/src/lib/io/worker.c
Line | Count | Source |
1 | | /* |
2 | | * This program is free software; you can redistribute it and/or modify |
3 | | * it under the terms of the GNU General Public License as published by |
4 | | * the Free Software Foundation; either version 2 of the License, or |
5 | | * (at your option) any later version. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA |
15 | | */ |
16 | | |
17 | | /** |
18 | | * $Id: c4af285a9dcfb80489a3ba2972595170571c8915 $ |
19 | | * |
20 | | * @brief Worker thread functions. |
21 | | * @file io/worker.c |
22 | | * |
23 | | * The "worker" thread is the one responsible for the bulk of the |
24 | | * work done when processing a request. Workers are spawned by the |
25 | | * scheduler, and create a kqueue (KQ) and control-plane |
26 | | * Atomic Queue (AQ) for control-plane communication. |
27 | | * |
28 | | * When a network thread discovers that it needs more workers, it |
29 | | * asks the scheduler for a KQ/AQ combination. The network thread |
30 | | * then creates a channel dedicated to that worker, and sends the |
31 | | * channel to the worker in a "new channel" message. The worker |
32 | | * receives the channel, and sends an ACK back to the network thread. |
33 | | * |
34 | | * The network thread then sends the worker new packets, which the |
35 | | * worker receives and processes. |
36 | | * |
37 | | * When a packet is decoded, it is put into the "runnable" heap, and |
38 | | * also into the timeout sublist. The main loop fr_worker() then |
39 | | * pulls new requests off of this heap and runs them. The main event |
40 | | * loop checks the head of the timeout sublist, and forcefully terminates |
41 | | * any requests which have been running for too long. |
42 | | * |
43 | | * If a request is yielded, it is placed onto the yielded list in |
44 | | * the worker "tracking" data structure. |
45 | | * |
46 | | * @copyright 2016 Alan DeKok (aland@freeradius.org) |
47 | | */ |
48 | | |
49 | | RCSID("$Id: c4af285a9dcfb80489a3ba2972595170571c8915 $") |
50 | | |
51 | 0 | #define LOG_PREFIX worker->name |
52 | 0 | #define LOG_DST worker->log |
53 | | |
54 | | #include <freeradius-devel/io/channel.h> |
55 | | #include <freeradius-devel/io/listen.h> |
56 | | #include <freeradius-devel/io/worker.h> |
57 | | #include <freeradius-devel/unlang/base.h> |
58 | | #include <freeradius-devel/util/minmax_heap.h> |
59 | | #include <freeradius-devel/util/timer.h> |
60 | | |
61 | | #include <stdalign.h> |
62 | | |
63 | | #ifdef WITH_VERIFY_PTR |
64 | | static void worker_verify(fr_worker_t *worker); |
65 | 0 | #define WORKER_VERIFY worker_verify(worker) |
66 | | #else |
67 | | #define WORKER_VERIFY |
68 | | #endif |
69 | | |
70 | | static _Atomic(uint64_t) request_number = 0; |
71 | | |
72 | | FR_SLAB_TYPES(request, request_t) |
73 | 0 | FR_SLAB_FUNCS(request, request_t) Unexecuted instantiation: worker.c:request_slab_release Unexecuted instantiation: worker.c:request_slab_list_alloc Unexecuted instantiation: worker.c:request_slab_init Unexecuted instantiation: worker.c:_request_slab_cleanup Unexecuted instantiation: worker.c:request_slab_reserve Unexecuted instantiation: worker.c:request_slab_element_init |
74 | | |
75 | | static _Thread_local fr_ring_buffer_t *fr_worker_rb; |
76 | | |
77 | | typedef struct { |
78 | | fr_channel_t *ch; |
79 | | |
80 | | /* |
81 | | * To save time, we don't care about num_elements here. Which means that we don't |
82 | | * need to cache or lookup the fr_worker_listen_t when we free a request. |
83 | | */ |
84 | | fr_dlist_head_t dlist; |
85 | | } fr_worker_channel_t; |
86 | | |
87 | | /** |
88 | | * A worker which takes packets from a master, and processes them. |
89 | | */ |
90 | | struct fr_worker_s { |
91 | | char const *name; //!< name of this worker |
92 | | fr_worker_config_t config; //!< external configuration |
93 | | |
94 | | unlang_interpret_t *intp; //!< Worker's local interpreter. |
95 | | |
96 | | pthread_t thread_id; //!< my thread ID |
97 | | |
98 | | fr_log_t const *log; //!< log destination |
99 | | fr_log_lvl_t lvl; //!< log level |
100 | | |
101 | | fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me |
102 | | |
103 | | fr_control_t *control; //!< the control plane |
104 | | |
105 | | fr_event_list_t *el; //!< our event list |
106 | | |
107 | | int num_channels; //!< actual number of channels |
108 | | |
109 | | fr_heap_t *runnable; //!< current runnable requests which we've spent time processing |
110 | | |
111 | | fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist. |
112 | | fr_time_delta_t max_request_time; //!< maximum time a request can be processed |
113 | | |
114 | | fr_rb_tree_t *dedup; //!< de-dup tree |
115 | | |
116 | | fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away |
117 | | |
118 | | fr_io_stats_t stats; //!< input / output stats |
119 | | fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request |
120 | | fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request |
121 | | |
122 | | uint64_t num_naks; //!< number of messages which were nak'd |
123 | | uint64_t num_active; //!< number of active requests |
124 | | |
125 | | fr_time_delta_t predicted; //!< How long we predict a request will take to execute. |
126 | | fr_time_tracking_t tracking; //!< how much time the worker has spent doing things. |
127 | | |
128 | | bool was_sleeping; //!< used to suppress multiple sleep signals in a row |
129 | | bool exiting; //!< are we exiting? |
130 | | |
131 | | fr_worker_channel_t *channel; //!< list of channels |
132 | | |
133 | | request_slab_list_t *slab; //!< slab allocator for request_t |
134 | | }; |
135 | | |
136 | | typedef struct { |
137 | | fr_listen_t const *listener; //!< incoming packets |
138 | | |
139 | | fr_rb_node_t node; //!< in tree of listeners |
140 | | |
141 | | /* |
142 | | * To save time, we don't care about num_elements here. Which means that we don't |
143 | | * need to cache or lookup the fr_worker_listen_t when we free a request. |
144 | | */ |
145 | | fr_dlist_head_t dlist; //!< of requests associated with this listener. |
146 | | } fr_worker_listen_t; |
147 | | |
148 | | |
149 | | static int8_t worker_listener_cmp(void const *one, void const *two) |
150 | 0 | { |
151 | 0 | fr_worker_listen_t const *a = one, *b = two; |
152 | |
|
153 | 0 | return CMP(a->listener, b->listener); |
154 | 0 | } |
155 | | |
156 | | |
157 | | /* |
158 | | * Explicitly cleanup the memory allocated to the ring buffer, |
159 | | * just in case valgrind complains about it. |
160 | | */ |
161 | | static int _fr_worker_rb_free(void *arg) |
162 | 0 | { |
163 | 0 | return talloc_free(arg); |
164 | 0 | } |
165 | | |
166 | | /** Initialise thread local storage |
167 | | * |
168 | | * @return fr_ring_buffer_t for messages |
169 | | */ |
170 | | static inline fr_ring_buffer_t *fr_worker_rb_init(void) |
171 | 0 | { |
172 | 0 | fr_ring_buffer_t *rb; |
173 | |
|
174 | 0 | rb = fr_worker_rb; |
175 | 0 | if (rb) return rb; |
176 | | |
177 | 0 | rb = fr_ring_buffer_create(NULL, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE); |
178 | 0 | if (!rb) { |
179 | 0 | fr_perror("Failed allocating memory for worker ring buffer"); |
180 | 0 | return NULL; |
181 | 0 | } |
182 | | |
183 | 0 | fr_atexit_thread_local(fr_worker_rb, _fr_worker_rb_free, rb); |
184 | |
|
185 | 0 | return rb; |
186 | 0 | } |
187 | | |
188 | | static inline bool is_worker_thread(fr_worker_t const *worker) |
189 | 0 | { |
190 | 0 | return (pthread_equal(pthread_self(), worker->thread_id) != 0); |
191 | 0 | } |
192 | | |
193 | | static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now); |
194 | | static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now); |
195 | | |
196 | | /** Callback which handles a message being received on the worker side. |
197 | | * |
198 | | * @param[in] ctx the worker |
199 | | * @param[in] ch the channel to drain |
200 | | * @param[in] cd the message (if any) to start with |
201 | | */ |
202 | | static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd) |
203 | 0 | { |
204 | 0 | fr_worker_t *worker = ctx; |
205 | |
|
206 | 0 | worker->stats.in++; |
207 | 0 | DEBUG3("Received request %" PRIu64 "", worker->stats.in); |
208 | 0 | cd->channel.ch = ch; |
209 | 0 | worker_request_bootstrap(worker, cd, fr_time()); |
210 | 0 | } |
211 | | |
212 | | static void worker_requests_cancel(fr_worker_channel_t *ch) |
213 | 0 | { |
214 | 0 | request_t *request; |
215 | |
|
216 | 0 | while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) { |
217 | 0 | unlang_interpret_signal(request, FR_SIGNAL_CANCEL); |
218 | 0 | } |
219 | 0 | } |
220 | | |
221 | | static void worker_exit(fr_worker_t *worker) |
222 | 0 | { |
223 | 0 | worker->exiting = true; |
224 | | |
225 | | /* |
226 | | * Don't allow the post event to run |
227 | | * any more requests. They'll be |
228 | | * signalled to stop before we exit. |
229 | | * |
230 | | * This only has an effect in single |
231 | | * threaded mode. |
232 | | */ |
233 | 0 | (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker); |
234 | 0 | } |
235 | | |
236 | | /** Handle a control plane message sent to the worker via a channel |
237 | | * |
238 | | * @param[in] ctx the worker |
239 | | * @param[in] data the message |
240 | | * @param[in] data_size size of the data |
241 | | * @param[in] now the current time |
242 | | */ |
243 | | static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now) |
244 | 0 | { |
245 | 0 | int i; |
246 | 0 | bool ok, was_sleeping; |
247 | 0 | fr_channel_t *ch; |
248 | 0 | fr_message_set_t *ms; |
249 | 0 | fr_channel_event_t ce; |
250 | 0 | fr_worker_t *worker = ctx; |
251 | |
|
252 | 0 | was_sleeping = worker->was_sleeping; |
253 | 0 | worker->was_sleeping = false; |
254 | | |
255 | | /* |
256 | | * We were woken up by a signal to do something. We're |
257 | | * not sleeping. |
258 | | */ |
259 | 0 | ce = fr_channel_service_message(now, &ch, data, data_size); |
260 | 0 | DEBUG3("Channel %s", |
261 | 0 | fr_table_str_by_value(channel_signals, ce, "<INVALID>")); |
262 | 0 | switch (ce) { |
263 | 0 | case FR_CHANNEL_ERROR: |
264 | 0 | return; |
265 | | |
266 | 0 | case FR_CHANNEL_EMPTY: |
267 | 0 | return; |
268 | | |
269 | 0 | case FR_CHANNEL_NOOP: |
270 | 0 | return; |
271 | | |
272 | 0 | case FR_CHANNEL_DATA_READY_REQUESTOR: |
273 | 0 | fr_assert(0 == 1); |
274 | 0 | break; |
275 | | |
276 | 0 | case FR_CHANNEL_DATA_READY_RESPONDER: |
277 | 0 | fr_assert(ch != NULL); |
278 | |
|
279 | 0 | if (!fr_channel_recv_request(ch)) { |
280 | 0 | worker->was_sleeping = was_sleeping; |
281 | |
|
282 | 0 | } else while (fr_channel_recv_request(ch)); |
283 | 0 | break; |
284 | | |
285 | 0 | case FR_CHANNEL_OPEN: |
286 | 0 | fr_assert(ch != NULL); |
287 | |
|
288 | 0 | ok = false; |
289 | 0 | for (i = 0; i < worker->config.max_channels; i++) { |
290 | 0 | fr_assert(worker->channel[i].ch != ch); |
291 | |
|
292 | 0 | if (worker->channel[i].ch != NULL) continue; |
293 | | |
294 | 0 | worker->channel[i].ch = ch; |
295 | 0 | fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry); |
296 | |
|
297 | 0 | DEBUG3("Received channel %p into array entry %d", ch, i); |
298 | |
|
299 | 0 | ms = fr_message_set_create(worker, worker->config.message_set_size, |
300 | 0 | sizeof(fr_channel_data_t), |
301 | 0 | worker->config.ring_buffer_size, false); |
302 | 0 | fr_assert(ms != NULL); |
303 | 0 | fr_channel_responder_uctx_add(ch, ms); |
304 | |
|
305 | 0 | worker->num_channels++; |
306 | 0 | ok = true; |
307 | 0 | break; |
308 | 0 | } |
309 | |
|
310 | 0 | fr_cond_assert(ok); |
311 | 0 | break; |
312 | | |
313 | 0 | case FR_CHANNEL_CLOSE: |
314 | 0 | fr_assert(ch != NULL); |
315 | |
|
316 | 0 | ok = false; |
317 | | |
318 | | /* |
319 | | * Locate the signalling channel in the list |
320 | | * of channels. |
321 | | */ |
322 | 0 | for (i = 0; i < worker->config.max_channels; i++) { |
323 | 0 | if (!worker->channel[i].ch) continue; |
324 | | |
325 | 0 | if (worker->channel[i].ch != ch) continue; |
326 | | |
327 | 0 | worker_requests_cancel(&worker->channel[i]); |
328 | |
|
329 | 0 | ms = fr_channel_responder_uctx_get(ch); |
330 | |
|
331 | 0 | fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0, |
332 | 0 | "Network added messages to channel after sending FR_CHANNEL_CLOSE"); |
333 | |
|
334 | 0 | fr_channel_responder_ack_close(ch); |
335 | 0 | fr_assert(ms != NULL); |
336 | 0 | fr_message_set_gc(ms); |
337 | 0 | talloc_free(ms); |
338 | |
|
339 | 0 | worker->channel[i].ch = NULL; |
340 | |
|
341 | 0 | fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */ |
342 | 0 | fr_assert(worker->num_channels > 0); |
343 | |
|
344 | 0 | worker->num_channels--; |
345 | 0 | ok = true; |
346 | 0 | break; |
347 | 0 | } |
348 | |
|
349 | 0 | fr_cond_assert(ok); |
350 | | |
351 | | /* |
352 | | * Our last input channel closed, |
353 | | * time to die. |
354 | | */ |
355 | 0 | if (worker->num_channels == 0) worker_exit(worker); |
356 | 0 | break; |
357 | 0 | } |
358 | 0 | } |
359 | | |
360 | | static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li) |
361 | 0 | { |
362 | 0 | fr_worker_listen_t *wl; |
363 | 0 | request_t *request; |
364 | |
|
365 | 0 | wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li }); |
366 | 0 | if (!wl) return -1; |
367 | | |
368 | 0 | while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) { |
369 | 0 | RERROR("Cancelling request due to socket being closed"); |
370 | 0 | unlang_interpret_signal(request, FR_SIGNAL_CANCEL); |
371 | 0 | } |
372 | |
|
373 | 0 | (void) fr_rb_delete(worker->listeners, wl); |
374 | 0 | talloc_free(wl); |
375 | |
|
376 | 0 | return 0; |
377 | 0 | } |
378 | | |
379 | | |
380 | | /** A socket is going away, so clean up any requests which use this socket. |
381 | | * |
382 | | * @param[in] ctx the worker |
383 | | * @param[in] data the message |
384 | | * @param[in] data_size size of the data |
385 | | * @param[in] now the current time |
386 | | */ |
387 | | static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now) |
388 | 0 | { |
389 | 0 | fr_listen_t const *li; |
390 | 0 | fr_worker_t *worker = ctx; |
391 | |
|
392 | 0 | fr_assert(data_size == sizeof(li)); |
393 | |
|
394 | 0 | memcpy(&li, data, sizeof(li)); |
395 | |
|
396 | 0 | (void) fr_worker_listen_cancel_self(worker, li); |
397 | 0 | } |
398 | | |
399 | | /** Send a NAK to the network thread |
400 | | * |
401 | | * The network thread believes that a worker is running a request until that request has been NAK'd. |
402 | | * We typically NAK requests when they've been hanging around in the worker's backlog too long, |
403 | | * or there was an error executing the request. |
404 | | * |
405 | | * @param[in] worker the worker |
406 | | * @param[in] cd the message to NAK |
407 | | * @param[in] now when the message is NAKd |
408 | | */ |
409 | | static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now) |
410 | 0 | { |
411 | 0 | size_t size; |
412 | 0 | fr_channel_data_t *reply; |
413 | 0 | fr_channel_t *ch; |
414 | 0 | fr_message_set_t *ms; |
415 | 0 | fr_listen_t *listen; |
416 | |
|
417 | 0 | worker->num_naks++; |
418 | | |
419 | | /* |
420 | | * Cache the outbound channel. We'll need it later. |
421 | | */ |
422 | 0 | ch = cd->channel.ch; |
423 | 0 | listen = cd->listen; |
424 | | |
425 | | /* |
426 | | * If the channel has been closed, but we haven't |
427 | | * been informed, that is extremely bad. |
428 | | * |
429 | | * Try to continue working... but we'll likely |
430 | | * leak memory or SEGV soon. |
431 | | */ |
432 | 0 | if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) { |
433 | 0 | fr_message_done(&cd->m); |
434 | 0 | return; |
435 | 0 | } |
436 | | |
437 | 0 | ms = fr_channel_responder_uctx_get(ch); |
438 | 0 | fr_assert(ms != NULL); |
439 | |
|
440 | 0 | size = listen->app_io->default_reply_size; |
441 | 0 | if (!size) size = listen->app_io->default_message_size; |
442 | | |
443 | | /* |
444 | | * Allocate a default message size. |
445 | | */ |
446 | 0 | MEM(reply = (fr_channel_data_t *) fr_message_and_data_reserve(ms, size)); |
447 | | |
448 | | /* |
449 | | * Encode a NAK |
450 | | */ |
451 | 0 | if (listen->app_io->nak) { |
452 | 0 | size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data, |
453 | 0 | cd->m.data_size, reply->m.data, reply->m.rb_size); |
454 | 0 | } else { |
455 | 0 | size = 1; /* rely on them to figure it the heck out */ |
456 | 0 | } |
457 | |
|
458 | 0 | (void) fr_message_and_data_commit(ms, &reply->m, size); |
459 | | |
460 | | /* |
461 | | * Fill in the NAK. |
462 | | */ |
463 | 0 | reply->m.when = now; |
464 | 0 | reply->reply.cpu_time = worker->tracking.running_total; |
465 | 0 | reply->reply.processing_time = fr_time_delta_from_msec(1); /* @todo - set to something better? */ |
466 | 0 | reply->reply.request_time = cd->request.recv_time; |
467 | |
|
468 | 0 | reply->listen = cd->listen; |
469 | 0 | reply->packet_ctx = cd->packet_ctx; |
470 | | |
471 | | /* |
472 | | * Mark the original message as done. |
473 | | */ |
474 | 0 | fr_message_done(&cd->m); |
475 | | |
476 | | /* |
477 | | * Send the reply, which also polls the request queue. |
478 | | */ |
479 | 0 | if (fr_channel_send_reply(ch, reply) < 0) { |
480 | 0 | DEBUG2("Failed sending reply to channel"); |
481 | 0 | } |
482 | |
|
483 | 0 | worker->stats.out++; |
484 | 0 | } |
485 | | |
486 | | /** Signal the unlang interpreter that it needs to stop running the request |
487 | | * |
488 | | * Signalling is a synchronous operation. Whatever I/O requests the request |
489 | | * is currently performing are immediately cancelled, and all the frames are |
490 | | * popped off the unlang stack. |
491 | | * |
492 | | * Modules and unlang keywords explicitly register signal handlers to deal |
493 | | * with their yield points being cancelled/interrupted via this function. |
494 | | * |
495 | | * The caller should assume the request is no longer viable after calling |
496 | | * this function. |
497 | | * |
498 | | * @param[in] request request to cancel. The request may still run to completion. |
499 | | */ |
500 | | static void worker_stop_request(request_t *request) |
501 | 0 | { |
502 | | /* |
503 | | * Also marks the request as done and runs |
504 | | * the internal/external callbacs. |
505 | | */ |
506 | 0 | unlang_interpret_signal(request, FR_SIGNAL_CANCEL); |
507 | 0 | } |
508 | | |
509 | | /** Enforce max_request_time |
510 | | * |
511 | | * Run periodically, and tries to clean up requests which were received by the network |
512 | | * thread more than max_request_time seconds ago. In the interest of not adding a |
513 | | * timer for every packet, the requests are given a 1 second leeway. |
514 | | * |
515 | | * @param[in] tl the worker's timer list. |
516 | | * @param[in] when the current time |
517 | | * @param[in] uctx the request_t timing out. |
518 | | */ |
519 | | static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx) |
520 | 0 | { |
521 | 0 | request_t *request = talloc_get_type_abort(uctx, request_t); |
522 | | |
523 | | /* |
524 | | * Waiting too long, delete it. |
525 | | */ |
526 | 0 | REDEBUG("Request has reached max_request_time - signalling it to stop"); |
527 | 0 | worker_stop_request(request); |
528 | | |
529 | | /* |
530 | | * This ensures the finally section can run timeout specific policies |
531 | | */ |
532 | 0 | request->rcode = RLM_MODULE_TIMEOUT; |
533 | 0 | } |
534 | | |
535 | | |
536 | | /** Start time tracking for a request, and mark it as runnable. |
537 | | * |
538 | | */ |
539 | | static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now) |
540 | 0 | { |
541 | | /* |
542 | | * New requests are inserted into the time order heap in |
543 | | * strict time priority. Once they are in the list, they |
544 | | * are only removed when the request is done / free'd. |
545 | | */ |
546 | 0 | fr_assert(!fr_timer_armed(request->timeout)); |
547 | |
|
548 | 0 | if (unlikely(fr_timer_in(request, worker->timeout, &request->timeout, worker->config.max_request_time, |
549 | 0 | true, _worker_request_timeout, request) < 0)) { |
550 | 0 | RERROR("Failed to set request timeout timer"); |
551 | 0 | return -1; |
552 | 0 | } |
553 | | |
554 | | /* |
555 | | * Bootstrap the async state machine with the initial |
556 | | * state of the request. |
557 | | */ |
558 | 0 | RDEBUG3("Time tracking started in yielded state"); |
559 | 0 | fr_time_tracking_start(&worker->tracking, &request->async->tracking, now); |
560 | 0 | fr_time_tracking_yield(&request->async->tracking, now); |
561 | 0 | worker->num_active++; |
562 | |
|
563 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
564 | 0 | (void) fr_heap_insert(&worker->runnable, request); |
565 | |
|
566 | 0 | return 0; |
567 | 0 | } |
568 | | |
569 | | static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now) |
570 | 0 | { |
571 | 0 | RDEBUG3("Time tracking ended"); |
572 | 0 | fr_time_tracking_end(&worker->predicted, &request->async->tracking, now); |
573 | 0 | fr_assert(worker->num_active > 0); |
574 | 0 | worker->num_active--; |
575 | |
|
576 | 0 | TALLOC_FREE(request->timeout); /* Disarm the reques timer */ |
577 | 0 | } |
578 | | |
579 | | /** Send a response packet to the network side |
580 | | * |
581 | | * @param[in] worker This worker. |
582 | | * @param[in] request we're sending a reply for. |
583 | | * @param[in] send_reply whether the network side sends a reply |
584 | | * @param[in] now The current time |
585 | | */ |
586 | | static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now) |
587 | 0 | { |
588 | 0 | fr_channel_data_t *reply; |
589 | 0 | fr_channel_t *ch; |
590 | 0 | fr_message_set_t *ms; |
591 | 0 | size_t size = 1; |
592 | |
|
593 | 0 | REQUEST_VERIFY(request); |
594 | | |
595 | | /* |
596 | | * If we're sending a reply, then it's no longer runnable. |
597 | | */ |
598 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
599 | |
|
600 | 0 | if (send_reply) { |
601 | 0 | size = request->async->listen->app_io->default_reply_size; |
602 | 0 | if (!size) size = request->async->listen->app_io->default_message_size; |
603 | 0 | } |
604 | | |
605 | | /* |
606 | | * Allocate and send the reply. |
607 | | */ |
608 | 0 | ch = request->async->channel; |
609 | 0 | fr_assert(ch != NULL); |
610 | | |
611 | | /* |
612 | | * If the channel has been closed, but we haven't |
613 | | * been informed, that is extremely bad. |
614 | | * |
615 | | * Try to continue working... but we'll likely |
616 | | * leak memory or SEGV soon. |
617 | | */ |
618 | 0 | if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) { |
619 | 0 | return; |
620 | 0 | } |
621 | | |
622 | 0 | ms = fr_channel_responder_uctx_get(ch); |
623 | 0 | fr_assert(ms != NULL); |
624 | |
|
625 | 0 | reply = (fr_channel_data_t *) fr_message_and_data_reserve(ms, size); |
626 | 0 | fr_assert(reply != NULL); |
627 | | |
628 | | /* |
629 | | * Encode it, if required. |
630 | | */ |
631 | 0 | if (send_reply) { |
632 | 0 | ssize_t slen = 0; |
633 | 0 | fr_listen_t const *listen = request->async->listen; |
634 | |
|
635 | 0 | if (listen->app_io->encode) { |
636 | 0 | slen = listen->app_io->encode(listen->app_io_instance, request, |
637 | 0 | reply->m.data, reply->m.rb_size); |
638 | 0 | } else if (listen->app->encode) { |
639 | 0 | slen = listen->app->encode(listen->app_instance, request, |
640 | 0 | reply->m.data, reply->m.rb_size); |
641 | 0 | } |
642 | 0 | if (slen < 0) { |
643 | 0 | RPERROR("Failed encoding request"); |
644 | 0 | *reply->m.data = 0; |
645 | 0 | slen = 1; |
646 | 0 | } |
647 | | |
648 | | /* |
649 | | * Shrink the buffer to the actual packet size. |
650 | | * |
651 | | * This will ALWAYS return the same message as we put in. |
652 | | */ |
653 | 0 | fr_assert((size_t) slen <= reply->m.rb_size); |
654 | 0 | (void) fr_message_and_data_commit(ms, &reply->m, slen); |
655 | 0 | } |
656 | | |
657 | | /* |
658 | | * Fill in the rest of the fields in the channel message. |
659 | | * |
660 | | * sequence / ack will be filled in by fr_channel_send_reply() |
661 | | */ |
662 | 0 | reply->m.when = now; |
663 | 0 | reply->reply.cpu_time = worker->tracking.running_total; |
664 | 0 | reply->reply.processing_time = request->async->tracking.running_total; |
665 | 0 | reply->reply.request_time = request->async->recv_time; |
666 | |
|
667 | 0 | reply->listen = request->async->listen; |
668 | 0 | reply->packet_ctx = request->async->packet_ctx; |
669 | | |
670 | | /* |
671 | | * Update the various timers. |
672 | | */ |
673 | 0 | fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time)); |
674 | 0 | fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now); |
675 | |
|
676 | 0 | RDEBUG("Finished request"); |
677 | | |
678 | | /* |
679 | | * Send the reply, which also polls the request queue. |
680 | | */ |
681 | 0 | if (fr_channel_send_reply(ch, reply) < 0) { |
682 | | /* |
683 | | * Should only happen if the TO_REQUESTOR |
684 | | * channel is full, or it's not yet active. |
685 | | * |
686 | | * Not much we can do except complain |
687 | | * loudly and cleanup the request. |
688 | | */ |
689 | 0 | RPERROR("Failed sending reply to network thread"); |
690 | 0 | } |
691 | |
|
692 | 0 | worker->stats.out++; |
693 | |
|
694 | 0 | fr_assert(!fr_timer_armed(request->timeout)); |
695 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
696 | |
|
697 | 0 | fr_dlist_entry_unlink(&request->listen_entry); |
698 | |
|
699 | 0 | #ifndef NDEBUG |
700 | 0 | request->async->el = NULL; |
701 | 0 | request->async->channel = NULL; |
702 | 0 | request->async->packet_ctx = NULL; |
703 | 0 | request->async->listen = NULL; |
704 | 0 | #endif |
705 | 0 | } |
706 | | |
707 | | /* |
708 | | * talloc_typed_asprintf() is horrifically slow for printing |
709 | | * simple numbers. |
710 | | */ |
711 | | static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number) |
712 | 0 | { |
713 | 0 | char buffer[32]; |
714 | 0 | char *p; |
715 | 0 | char const *numbers = "0123456789"; |
716 | |
|
717 | 0 | p = buffer + 30; |
718 | 0 | *(p--) = '\0'; |
719 | |
|
720 | 0 | while (number > 0) { |
721 | 0 | *(p--) = numbers[number % 10]; |
722 | 0 | number /= 10; |
723 | 0 | } |
724 | |
|
725 | 0 | if (p[1]) return talloc_strdup(ctx, p + 1); |
726 | | |
727 | 0 | return talloc_strdup(ctx, "0"); |
728 | 0 | } |
729 | | |
730 | | /** Initialize various request fields needed by the worker. |
731 | | * |
732 | | */ |
733 | | static inline CC_HINT(always_inline) |
734 | | void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now) |
735 | | { |
736 | | /* |
737 | | * For internal requests request->packet |
738 | | * and request->reply are already populated. |
739 | | */ |
740 | | if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false)); |
741 | | if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false)); |
742 | | |
743 | | request->packet->timestamp = now; |
744 | | request->async = talloc_zero(request, fr_async_t); |
745 | | request->async->recv_time = now; |
746 | | request->async->el = worker->el; |
747 | | fr_dlist_entry_init(&request->async->entry); |
748 | | } |
749 | | |
750 | | static inline CC_HINT(always_inline) |
751 | | void worker_request_name_number(request_t *request) |
752 | 0 | { |
753 | 0 | request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst); |
754 | 0 | if (request->name) talloc_const_free(request->name); |
755 | 0 | request->name = itoa_internal(request, request->number); |
756 | 0 | } |
757 | | |
758 | | static inline CC_HINT(always_inline) |
759 | | uint32_t worker_num_requests(fr_worker_t *worker) |
760 | 0 | { |
761 | 0 | return fr_timer_list_num_events(worker->timeout); |
762 | 0 | } |
763 | | |
764 | | static int _worker_request_deinit(request_t *request, UNUSED void *uctx) |
765 | 0 | { |
766 | 0 | return request_slab_deinit(request); |
767 | 0 | } |
768 | | |
769 | | static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now) |
770 | | { |
771 | | int ret = -1; |
772 | | request_t *request; |
773 | | fr_listen_t *listen = cd->listen; |
774 | | |
775 | | if (worker_num_requests(worker) >= (uint32_t) worker->config.max_requests) { |
776 | | RATE_LIMIT_GLOBAL(ERROR, "Worker at max requests"); |
777 | | goto nak; |
778 | | } |
779 | | |
780 | | /* |
781 | | * Receive a message to the worker queue, and decode it |
782 | | * to a request. |
783 | | */ |
784 | | fr_assert(listen != NULL); |
785 | | |
786 | | request = request_slab_reserve(worker->slab); |
787 | | if (!request) { |
788 | | RATE_LIMIT_GLOBAL(ERROR, "Worker failed allocating new request"); |
789 | | goto nak; |
790 | | } |
791 | | /* |
792 | | * Ensures that both the deinit function runs AND |
793 | | * the request is returned to the slab if something |
794 | | * calls talloc_free() on it. |
795 | | */ |
796 | | request_slab_element_set_destructor(request, _worker_request_deinit, worker); |
797 | | |
798 | | /* |
799 | | * Have to initialise the request manually because namspace |
800 | | * changes based on the listener that allocated it. |
801 | | */ |
802 | | if (request_init(request, REQUEST_TYPE_EXTERNAL, (&(request_init_args_t){ .namespace = listen->dict })) < 0) { |
803 | | request_slab_release(request); |
804 | | goto nak; |
805 | | } |
806 | | |
807 | | /* |
808 | | * Do normal worker init that's shared between internal |
809 | | * and external requests. |
810 | | */ |
811 | | worker_request_init(worker, request, now); |
812 | | worker_request_name_number(request); |
813 | | |
814 | | /* |
815 | | * Associate our interpreter with the request |
816 | | */ |
817 | | unlang_interpret_set(request, worker->intp); |
818 | | |
819 | | request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */ |
820 | | |
821 | | /* |
822 | | * Update the transport-specific fields. |
823 | | */ |
824 | | request->async->channel = cd->channel.ch; |
825 | | |
826 | | request->async->recv_time = cd->request.recv_time; |
827 | | |
828 | | request->async->listen = listen; |
829 | | request->async->packet_ctx = cd->packet_ctx; |
830 | | request->priority = cd->priority; |
831 | | |
832 | | /* |
833 | | * Now that the "request" structure has been initialized, go decode the packet. |
834 | | * |
835 | | * Note that this also sets the "async process" function. |
836 | | */ |
837 | | if (listen->app->decode) { |
838 | | ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size); |
839 | | } else if (listen->app_io->decode) { |
840 | | ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size); |
841 | | } |
842 | | |
843 | | if (ret < 0) { |
844 | | fail: |
845 | | fr_assert(talloc_parent(request->stack) == request); |
846 | | request_slab_release(request); |
847 | | |
848 | | nak: |
849 | | worker_nak(worker, cd, now); |
850 | | return; |
851 | | } |
852 | | |
853 | | /* |
854 | | * Set the entry point for this virtual server. |
855 | | */ |
856 | | if (unlang_call_push(NULL, request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) { |
857 | | RERROR("Protocol failed to set 'process' function"); |
858 | | goto fail; |
859 | | } |
860 | | |
861 | | /* |
862 | | * Look for conflicting / duplicate packets, but only if |
863 | | * requested to do so. |
864 | | */ |
865 | | if (request->async->listen->track_duplicates) { |
866 | | request_t *old; |
867 | | |
868 | | old = fr_rb_find(worker->dedup, request); |
869 | | if (!old) { |
870 | | goto insert_new; |
871 | | } |
872 | | |
873 | | fr_assert(old->async->listen == request->async->listen); |
874 | | fr_assert(old->async->channel == request->async->channel); |
875 | | |
876 | | /* |
877 | | * There's a new packet. Do we keep the old one, |
878 | | * or the new one? This decision is made by |
879 | | * checking the recv_time, which is a |
880 | | * nanosecond-resolution timer. If the time is |
881 | | * identical, then the new packet is the same as |
882 | | * the old one. |
883 | | * |
884 | | * If the new packet is a duplicate of the old |
885 | | * one, then we can just discard the new one. We |
886 | | * have to tell the channel that we've "eaten" |
887 | | * this reply, so the sequence number should |
888 | | * increase. |
889 | | * |
890 | | * @todo - fix the channel code to do queue |
891 | | * depth, and not sequence / ack. |
892 | | */ |
893 | | if (fr_time_eq(old->async->recv_time, request->async->recv_time)) { |
894 | | RWARN("Discarding duplicate of request (%"PRIu64")", old->number); |
895 | | |
896 | | fr_channel_null_reply(request->async->channel); |
897 | | request_slab_release(request); |
898 | | |
899 | | /* |
900 | | * Signal there's a dup, and ignore the |
901 | | * return code. We don't bother replying |
902 | | * here, as an FD event or timer will |
903 | | * wake up the request, and cause it to |
904 | | * continue. |
905 | | * |
906 | | * @todo - the old request is NOT |
907 | | * running, but is yielded. It MAY clean |
908 | | * itself up, or do something... |
909 | | */ |
910 | | unlang_interpret_signal(old, FR_SIGNAL_DUP); |
911 | | worker->stats.dup++; |
912 | | |
913 | | fr_message_done(&cd->m); |
914 | | return; |
915 | | } |
916 | | |
917 | | /* |
918 | | * Stop the old request, and decrement the number |
919 | | * of active requests. |
920 | | */ |
921 | | RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number); |
922 | | |
923 | | worker_stop_request(old); |
924 | | worker->stats.dropped++; |
925 | | (void) fr_rb_remove(worker->dedup, old); /* remove, but do NOT free it */ |
926 | | |
927 | | insert_new: |
928 | | (void) fr_rb_insert(worker->dedup, request); |
929 | | } |
930 | | |
931 | | if (worker_request_time_tracking_start(worker, request, now) < 0) { |
932 | | if (request->async->listen->track_duplicates) (void) fr_rb_remove(worker->dedup, request); |
933 | | goto fail; |
934 | | } |
935 | | |
936 | | /* |
937 | | * We're done with this message. |
938 | | */ |
939 | | fr_message_done(&cd->m); |
940 | | |
941 | | { |
942 | | fr_worker_listen_t *wl; |
943 | | |
944 | | wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen }); |
945 | | if (!wl) { |
946 | | MEM(wl = talloc_zero(worker, fr_worker_listen_t)); |
947 | | fr_dlist_init(&wl->dlist, request_t, listen_entry); |
948 | | wl->listener = listen; |
949 | | |
950 | | (void) fr_rb_insert(worker->listeners, wl); |
951 | | } |
952 | | |
953 | | fr_dlist_insert_tail(&wl->dlist, request); |
954 | | } |
955 | | } |
956 | | |
957 | | /** |
958 | | * Track a request_t in the "runnable" heap. |
959 | | * Higher priorities take precedence, followed by lower sequence numbers |
960 | | */ |
961 | | static int8_t worker_runnable_cmp(void const *one, void const *two) |
962 | 0 | { |
963 | 0 | request_t const *a = one, *b = two; |
964 | 0 | int ret; |
965 | | |
966 | | /* |
967 | | * Prefer higher priority packets. |
968 | | */ |
969 | 0 | ret = CMP_PREFER_LARGER(b->priority, a->priority); |
970 | 0 | if (ret != 0) return ret; |
971 | | |
972 | | /* |
973 | | * Prefer packets which are further along in their processing sequence. |
974 | | */ |
975 | 0 | ret = CMP_PREFER_LARGER(a->sequence, b->sequence); |
976 | 0 | if (ret != 0) return ret; |
977 | | |
978 | | /* |
979 | | * Smaller timestamp (i.e. earlier) is more important. |
980 | | */ |
981 | 0 | return fr_time_cmp(a->async->recv_time, b->async->recv_time); |
982 | 0 | } |
983 | | |
984 | | /** |
985 | | * Track a request_t in the "dedup" tree |
986 | | */ |
987 | | static int8_t worker_dedup_cmp(void const *one, void const *two) |
988 | 0 | { |
989 | 0 | int ret; |
990 | 0 | request_t const *a = one, *b = two; |
991 | |
|
992 | 0 | ret = CMP(a->async->listen, b->async->listen); |
993 | 0 | if (ret) return ret; |
994 | | |
995 | 0 | return CMP(a->async->packet_ctx, b->async->packet_ctx); |
996 | 0 | } |
997 | | |
998 | | /** Destroy a worker |
999 | | * |
1000 | | * The input channels are signaled, and local messages are cleaned up. |
1001 | | * |
1002 | | * This should be called to _EXPLICITLY_ destroy a worker, when some fatal |
1003 | | * error has occurred on the worker side, and we need to destroy it. |
1004 | | * |
1005 | | * We signal all pending requests in the backlog to stop, and tell the |
1006 | | * network side that it should not send us any more requests. |
1007 | | * |
1008 | | * @param[in] worker the worker to destroy. |
1009 | | */ |
1010 | | void fr_worker_destroy(fr_worker_t *worker) |
1011 | 0 | { |
1012 | 0 | int i, count, ret; |
1013 | | |
1014 | | // WORKER_VERIFY; |
1015 | | |
1016 | | /* |
1017 | | * Stop any new requests running with this interpreter |
1018 | | */ |
1019 | 0 | unlang_interpret_set_thread_default(NULL); |
1020 | | |
1021 | | /* |
1022 | | * Destroy all of the active requests. These are ones |
1023 | | * which are still waiting for timers or file descriptor |
1024 | | * events. |
1025 | | */ |
1026 | 0 | count = 0; |
1027 | | |
1028 | | /* |
1029 | | * Force the timeout event to fire for all requests that |
1030 | | * are still running. |
1031 | | */ |
1032 | 0 | ret = fr_timer_list_force_run(worker->timeout); |
1033 | 0 | if (unlikely(ret < 0)) { |
1034 | 0 | fr_assert_msg(0, "Failed to force run the timeout list"); |
1035 | 0 | } else { |
1036 | 0 | count += ret; |
1037 | 0 | } |
1038 | |
|
1039 | 0 | fr_assert(fr_heap_num_elements(worker->runnable) == 0); |
1040 | |
|
1041 | 0 | DEBUG("Worker is exiting - stopped %u requests", count); |
1042 | | |
1043 | | /* |
1044 | | * Signal the channels that we're closing. |
1045 | | * |
1046 | | * The other end owns the channel, and will take care of |
1047 | | * popping messages in the TO_RESPONDER queue, and marking |
1048 | | * them FR_MESSAGE_DONE. It will ignore the messages in |
1049 | | * the TO_REQUESTOR queue, as we own those. They will be |
1050 | | * automatically freed when our talloc context is freed. |
1051 | | */ |
1052 | 0 | for (i = 0; i < worker->config.max_channels; i++) { |
1053 | 0 | if (!worker->channel[i].ch) continue; |
1054 | | |
1055 | 0 | worker_requests_cancel(&worker->channel[i]); |
1056 | |
|
1057 | 0 | fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0, |
1058 | 0 | "Pending messages in channel after cancelling request"); |
1059 | |
|
1060 | 0 | fr_channel_responder_ack_close(worker->channel[i].ch); |
1061 | 0 | } |
1062 | |
|
1063 | 0 | talloc_free(worker); |
1064 | 0 | } |
1065 | | |
1066 | | /** Internal request (i.e. one generated by the interpreter) is now complete |
1067 | | * |
1068 | | */ |
1069 | | static void _worker_request_internal_init(request_t *request, void *uctx) |
1070 | 0 | { |
1071 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1072 | 0 | fr_time_t now = fr_time(); |
1073 | |
|
1074 | 0 | worker_request_init(worker, request, now); |
1075 | | |
1076 | | /* |
1077 | | * Requests generated by the interpreter |
1078 | | * are always marked up as internal. |
1079 | | */ |
1080 | 0 | fr_assert(request_is_internal(request)); |
1081 | 0 | if (worker_request_time_tracking_start(worker, request, now) < 0) { |
1082 | 0 | unlang_interpret_signal(request, FR_SIGNAL_CANCEL); |
1083 | 0 | } |
1084 | 0 | } |
1085 | | |
1086 | | |
1087 | | /** External request is now complete |
1088 | | * |
1089 | | */ |
1090 | | static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx) |
1091 | 0 | { |
1092 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1093 | 0 | fr_time_t now = fr_time(); |
1094 | | |
1095 | | /* |
1096 | | * All external requests MUST have a listener. |
1097 | | */ |
1098 | 0 | fr_assert(request_is_external(request)); |
1099 | 0 | fr_assert(request->async->listen != NULL); |
1100 | | |
1101 | | /* |
1102 | | * Only real packets are in the dedup tree. And even |
1103 | | * then, only some of the time. |
1104 | | */ |
1105 | 0 | if (request->async->listen->track_duplicates && fr_rb_node_inline_in_tree(&request->dedup_node)) { |
1106 | 0 | (void) fr_rb_delete(worker->dedup, request); |
1107 | 0 | } |
1108 | | |
1109 | | /* |
1110 | | * If we're running a real request, then the final |
1111 | | * indentation MUST be zero. Otherwise we skipped |
1112 | | * something! |
1113 | | * |
1114 | | * Also check that the request is NOT marked as |
1115 | | * "yielded", but is in fact done. |
1116 | | * |
1117 | | * @todo - check that the stack is at frame 0, otherwise |
1118 | | * more things have gone wrong. |
1119 | | */ |
1120 | 0 | fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0), |
1121 | 0 | "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang); |
1122 | 0 | fr_assert_msg(!unlang_interpret_is_resumable(request), |
1123 | 0 | "Request %s is marked as yielded at end of processing", request->name); |
1124 | 0 | fr_assert_msg(unlang_interpret_stack_depth(request) == 0, |
1125 | 0 | "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request)); |
1126 | 0 | RDEBUG("Done request"); |
1127 | | |
1128 | | /* |
1129 | | * The request is done. Track that. |
1130 | | */ |
1131 | 0 | worker_request_time_tracking_end(worker, request, now); |
1132 | | |
1133 | | /* |
1134 | | * Remove it from the list of requests associated with this channel. |
1135 | | */ |
1136 | 0 | if (fr_dlist_entry_in_list(&request->async->entry)) { |
1137 | 0 | fr_dlist_entry_unlink(&request->async->entry); |
1138 | 0 | } |
1139 | | |
1140 | | /* |
1141 | | * These conditions are true when the server is |
1142 | | * exiting and we're stopping all the requests. |
1143 | | * |
1144 | | * This should never happen otherwise. |
1145 | | */ |
1146 | 0 | if (unlikely(!fr_channel_active(request->async->channel))) { |
1147 | 0 | fr_dlist_entry_unlink(&request->listen_entry); |
1148 | 0 | request_slab_release(request); |
1149 | 0 | return; |
1150 | 0 | } |
1151 | | |
1152 | 0 | worker_send_reply(worker, request, !unlang_request_is_cancelled(request), now); |
1153 | 0 | request_slab_release(request); |
1154 | 0 | } |
1155 | | |
1156 | | /** Internal request (i.e. one generated by the interpreter) is now complete |
1157 | | * |
1158 | | * Whatever generated the request is now responsible for freeing it. |
1159 | | */ |
1160 | | static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx) |
1161 | 0 | { |
1162 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1163 | |
|
1164 | 0 | worker_request_time_tracking_end(worker, request, fr_time()); |
1165 | |
|
1166 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
1167 | 0 | fr_assert(!fr_timer_armed(request->timeout)); |
1168 | 0 | fr_assert(!fr_dlist_entry_in_list(&request->async->entry)); |
1169 | 0 | } |
1170 | | |
1171 | | /** Detached request (i.e. one generated by the interpreter with no parent) is now complete |
1172 | | * |
1173 | | * As the request has no parent, then there's nothing to free it |
1174 | | * so we have to. |
1175 | | */ |
1176 | | static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx) |
1177 | 0 | { |
1178 | | /* |
1179 | | * No time tracking for detached requests |
1180 | | * so we don't need to call |
1181 | | * worker_request_time_tracking_end. |
1182 | | */ |
1183 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
1184 | | |
1185 | | /* |
1186 | | * Normally worker_request_time_tracking_end |
1187 | | * would remove the request from the time |
1188 | | * order heap, but we need to do that for |
1189 | | * detached requests. |
1190 | | */ |
1191 | 0 | TALLOC_FREE(request->timeout); |
1192 | |
|
1193 | 0 | fr_assert(!fr_dlist_entry_in_list(&request->async->entry)); |
1194 | | |
1195 | | /* |
1196 | | * Detached requests have to be freed by us |
1197 | | * as nothing else can free them. |
1198 | | * |
1199 | | * All other requests must be freed by the |
1200 | | * code which allocated them. |
1201 | | */ |
1202 | 0 | talloc_free(request); |
1203 | 0 | } |
1204 | | |
1205 | | |
1206 | | /** Make us responsible for running the request |
1207 | | * |
1208 | | */ |
1209 | | static void _worker_request_detach(request_t *request, void *uctx) |
1210 | 0 | { |
1211 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1212 | 0 | fr_time_t now = fr_time(); |
1213 | |
|
1214 | 0 | RDEBUG4("%s - Request detaching", __FUNCTION__); |
1215 | |
|
1216 | 0 | if (request_is_detachable(request)) { |
1217 | | /* |
1218 | | * End the time tracking... We don't track detached requests, |
1219 | | * because they don't contribute for the time consumed by an |
1220 | | * external request. |
1221 | | */ |
1222 | 0 | if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) { |
1223 | 0 | RDEBUG3("Forcing time tracking to running state, from yielded, for request detach"); |
1224 | 0 | fr_time_tracking_resume(&request->async->tracking, now); |
1225 | 0 | } |
1226 | 0 | worker_request_time_tracking_end(worker, request, now); |
1227 | |
|
1228 | 0 | if (request_detach(request) < 0) RPEDEBUG("Failed detaching request"); |
1229 | |
|
1230 | 0 | RDEBUG3("Request is detached"); |
1231 | 0 | } else { |
1232 | 0 | fr_assert_msg(0, "Request is not detachable"); |
1233 | 0 | } |
1234 | |
|
1235 | 0 | return; |
1236 | 0 | } |
1237 | | |
1238 | | /** Request is now runnable |
1239 | | * |
1240 | | */ |
1241 | | static void _worker_request_runnable(request_t *request, void *uctx) |
1242 | 0 | { |
1243 | 0 | fr_worker_t *worker = uctx; |
1244 | |
|
1245 | 0 | RDEBUG4("%s - Request marked as runnable", __FUNCTION__); |
1246 | 0 | fr_heap_insert(&worker->runnable, request); |
1247 | 0 | } |
1248 | | |
1249 | | /** Interpreter yielded request |
1250 | | * |
1251 | | */ |
1252 | | static void _worker_request_yield(request_t *request, UNUSED void *uctx) |
1253 | 0 | { |
1254 | 0 | RDEBUG4("%s - Request yielded", __FUNCTION__); |
1255 | 0 | if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time()); |
1256 | 0 | } |
1257 | | |
1258 | | /** Interpreter is starting to work on request again |
1259 | | * |
1260 | | */ |
1261 | | static void _worker_request_resume(request_t *request, UNUSED void *uctx) |
1262 | 0 | { |
1263 | 0 | RDEBUG4("%s - Request resuming", __FUNCTION__); |
1264 | 0 | if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time()); |
1265 | 0 | } |
1266 | | |
1267 | | /** Check if a request is scheduled |
1268 | | * |
1269 | | */ |
1270 | | static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx) |
1271 | 0 | { |
1272 | 0 | return fr_heap_entry_inserted(request->runnable); |
1273 | 0 | } |
1274 | | |
1275 | | /** Update a request's priority |
1276 | | * |
1277 | | */ |
1278 | | static void _worker_request_prioritise(request_t *request, void *uctx) |
1279 | 0 | { |
1280 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1281 | |
|
1282 | 0 | RDEBUG4("%s - Request priority changed", __FUNCTION__); |
1283 | | |
1284 | | /* Extract the request from the runnable queue _if_ it's in the runnable queue */ |
1285 | 0 | if (fr_heap_extract(&worker->runnable, request) < 0) return; |
1286 | | |
1287 | | /* Reinsert it to re-evaluate its new priority */ |
1288 | 0 | fr_heap_insert(&worker->runnable, request); |
1289 | 0 | } |
1290 | | |
1291 | | /** Run a request |
1292 | | * |
1293 | | * Until it either yields, or is done. |
1294 | | * |
1295 | | * This function is also responsible for sending replies, and |
1296 | | * cleaning up the request. |
1297 | | * |
1298 | | * @param[in] worker the worker |
1299 | | * @param[in] start the current time |
1300 | | */ |
1301 | | static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start) |
1302 | 0 | { |
1303 | 0 | request_t *request; |
1304 | 0 | fr_time_t now; |
1305 | |
|
1306 | 0 | WORKER_VERIFY; |
1307 | |
|
1308 | 0 | now = start; |
1309 | | |
1310 | | /* |
1311 | | * Busy-loop running requests for 1ms. We still poll the |
1312 | | * event loop 1000 times a second, OR when there's no |
1313 | | * more work to do. This allows us to make progress with |
1314 | | * ongoing requests, at the expense of sometimes ignoring |
1315 | | * new ones. |
1316 | | */ |
1317 | 0 | while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) && |
1318 | 0 | ((request = fr_heap_pop(&worker->runnable)) != NULL)) { |
1319 | |
|
1320 | 0 | REQUEST_VERIFY(request); |
1321 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
1322 | | |
1323 | | /* |
1324 | | * For real requests, if the channel is gone, |
1325 | | * just stop the request and free it. |
1326 | | */ |
1327 | 0 | if (request->async->channel && !fr_channel_active(request->async->channel)) { |
1328 | 0 | worker_stop_request(request); |
1329 | 0 | continue; |
1330 | 0 | } |
1331 | | |
1332 | 0 | (void)unlang_interpret(request, UNLANG_REQUEST_RESUME); |
1333 | |
|
1334 | 0 | now = fr_time(); |
1335 | 0 | } |
1336 | 0 | } |
1337 | | |
1338 | | /** Create a worker |
1339 | | * |
1340 | | * @param[in] ctx the talloc context |
1341 | | * @param[in] name the name of this worker |
1342 | | * @param[in] el the event list |
1343 | | * @param[in] logger the destination for all logging messages |
1344 | | * @param[in] lvl log level |
1345 | | * @param[in] config various configuration parameters |
1346 | | * @return |
1347 | | * - NULL on error |
1348 | | * - fr_worker_t on success |
1349 | | */ |
1350 | | fr_worker_t *fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, |
1351 | | fr_worker_config_t *config) |
1352 | 0 | { |
1353 | 0 | fr_worker_t *worker; |
1354 | |
|
1355 | 0 | worker = talloc_zero(ctx, fr_worker_t); |
1356 | 0 | if (!worker) { |
1357 | 0 | nomem: |
1358 | 0 | fr_strerror_const("Failed allocating memory"); |
1359 | 0 | return NULL; |
1360 | 0 | } |
1361 | | |
1362 | 0 | worker->name = talloc_strdup(worker, name); /* thread locality */ |
1363 | |
|
1364 | 0 | if (config) worker->config = *config; |
1365 | |
|
1366 | 0 | #define CHECK_CONFIG(_x, _min, _max) do { \ |
1367 | 0 | if (!worker->config._x) worker->config._x = _min; \ |
1368 | 0 | if (worker->config._x < _min) worker->config._x = _min; \ |
1369 | 0 | if (worker->config._x > _max) worker->config._x = _max; \ |
1370 | 0 | } while (0) |
1371 | |
|
1372 | 0 | #define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \ |
1373 | 0 | if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \ |
1374 | 0 | if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \ |
1375 | 0 | } while (0) |
1376 | |
|
1377 | 0 | CHECK_CONFIG(max_requests,1024,(1 << 30)); |
1378 | 0 | CHECK_CONFIG(max_channels, 64, 1024); |
1379 | 0 | CHECK_CONFIG(reuse.child_pool_size, 4096, 65536); |
1380 | 0 | CHECK_CONFIG(message_set_size, 1024, 8192); |
1381 | 0 | CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20)); |
1382 | 0 | CHECK_CONFIG_TIME_DELTA(max_request_time, fr_time_delta_from_sec(5), fr_time_delta_from_sec(120)); |
1383 | |
|
1384 | 0 | worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels); |
1385 | 0 | if (!worker->channel) { |
1386 | 0 | talloc_free(worker); |
1387 | 0 | goto nomem; |
1388 | 0 | } |
1389 | | |
1390 | 0 | worker->thread_id = pthread_self(); |
1391 | 0 | worker->el = el; |
1392 | 0 | worker->log = logger; |
1393 | 0 | worker->lvl = lvl; |
1394 | | |
1395 | | /* |
1396 | | * The worker thread starts now. Manually initialize it, |
1397 | | * because we're tracking request time, not the time that |
1398 | | * the worker thread is running. |
1399 | | */ |
1400 | 0 | memset(&worker->tracking, 0, sizeof(worker->tracking)); |
1401 | |
|
1402 | 0 | worker->aq_control = fr_atomic_queue_talloc(worker, 1024); |
1403 | 0 | if (!worker->aq_control) { |
1404 | 0 | fr_strerror_const("Failed creating atomic queue"); |
1405 | 0 | fail: |
1406 | 0 | talloc_free(worker); |
1407 | 0 | return NULL; |
1408 | 0 | } |
1409 | | |
1410 | 0 | worker->control = fr_control_create(worker, el, worker->aq_control, 7); |
1411 | 0 | if (!worker->control) { |
1412 | 0 | fr_strerror_const_push("Failed creating control plane"); |
1413 | 0 | goto fail; |
1414 | 0 | } |
1415 | | |
1416 | 0 | if (fr_control_callback_add(&worker->control, FR_CONTROL_ID_CHANNEL, worker, worker_channel_callback) < 0) { |
1417 | 0 | fr_strerror_const_push("Failed adding control channel"); |
1418 | 0 | goto fail; |
1419 | 0 | } |
1420 | | |
1421 | 0 | if (fr_control_callback_add(&worker->control, FR_CONTROL_ID_LISTEN_DEAD, worker, worker_listen_cancel_callback) < 0) { |
1422 | 0 | fr_strerror_const_push("Failed adding callback for listeners"); |
1423 | 0 | goto fail; |
1424 | 0 | } |
1425 | | |
1426 | 0 | if (fr_control_open(worker->control) < 0) { |
1427 | 0 | fr_strerror_const_push("Failed opening control plane"); |
1428 | 0 | goto fail; |
1429 | 0 | } |
1430 | | |
1431 | 0 | worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable, 0); |
1432 | 0 | if (!worker->runnable) { |
1433 | 0 | fr_strerror_const("Failed creating runnable heap"); |
1434 | 0 | goto fail; |
1435 | 0 | } |
1436 | | |
1437 | 0 | worker->timeout = fr_timer_list_ordered_alloc(worker, el->tl); |
1438 | 0 | if (!worker->timeout) { |
1439 | 0 | fr_strerror_const("Failed creating timeouts list"); |
1440 | 0 | goto fail; |
1441 | 0 | } |
1442 | | |
1443 | 0 | worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL); |
1444 | 0 | if (!worker->dedup) { |
1445 | 0 | fr_strerror_const("Failed creating de_dup tree"); |
1446 | 0 | goto fail; |
1447 | 0 | } |
1448 | | |
1449 | 0 | worker->listeners = fr_rb_inline_talloc_alloc(worker, fr_worker_listen_t, node, worker_listener_cmp, NULL); |
1450 | 0 | if (!worker->listeners) { |
1451 | 0 | fr_strerror_const("Failed creating listener tree"); |
1452 | 0 | goto fail; |
1453 | 0 | } |
1454 | | |
1455 | 0 | worker->intp = unlang_interpret_init(worker, el, |
1456 | 0 | &(unlang_request_func_t){ |
1457 | 0 | .init_internal = _worker_request_internal_init, |
1458 | |
|
1459 | 0 | .done_external = _worker_request_done_external, |
1460 | 0 | .done_internal = _worker_request_done_internal, |
1461 | 0 | .done_detached = _worker_request_done_detached, |
1462 | |
|
1463 | 0 | .detach = _worker_request_detach, |
1464 | 0 | .yield = _worker_request_yield, |
1465 | 0 | .resume = _worker_request_resume, |
1466 | 0 | .mark_runnable = _worker_request_runnable, |
1467 | |
|
1468 | 0 | .scheduled = _worker_request_scheduled, |
1469 | 0 | .prioritise = _worker_request_prioritise |
1470 | 0 | }, |
1471 | 0 | worker); |
1472 | 0 | if (!worker->intp){ |
1473 | 0 | fr_strerror_const("Failed initialising interpreter"); |
1474 | 0 | goto fail; |
1475 | 0 | } |
1476 | | |
1477 | 0 | { |
1478 | 0 | if (!worker->config.reuse.child_pool_size) worker->config.reuse.child_pool_size = REQUEST_POOL_SIZE; |
1479 | 0 | if (!worker->config.reuse.num_children) worker->config.reuse.num_children = REQUEST_POOL_NUM_OBJECTS; |
1480 | |
|
1481 | 0 | if (!(worker->slab = request_slab_list_alloc(worker, el, &worker->config.reuse, NULL, NULL, |
1482 | 0 | UNCONST(void *, worker), true, false))) { |
1483 | 0 | fr_strerror_const("Failed creating request slab list"); |
1484 | 0 | goto fail; |
1485 | 0 | } |
1486 | 0 | } |
1487 | | |
1488 | 0 | unlang_interpret_set_thread_default(worker->intp); |
1489 | |
|
1490 | 0 | return worker; |
1491 | 0 | } |
1492 | | |
1493 | | |
1494 | | /** The main loop and entry point of the stand-alone worker thread. |
1495 | | * |
1496 | | * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event() |
1497 | | * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the |
1498 | | * request. |
1499 | | * |
1500 | | * @param[in] worker the worker data structure to manage |
1501 | | */ |
1502 | | void fr_worker(fr_worker_t *worker) |
1503 | 0 | { |
1504 | 0 | WORKER_VERIFY; |
1505 | |
|
1506 | 0 | while (true) { |
1507 | 0 | bool wait_for_event; |
1508 | 0 | int num_events; |
1509 | |
|
1510 | 0 | WORKER_VERIFY; |
1511 | | |
1512 | | /* |
1513 | | * There are runnable requests. We still service |
1514 | | * the event loop, but we don't wait for events. |
1515 | | */ |
1516 | 0 | wait_for_event = (fr_heap_num_elements(worker->runnable) == 0); |
1517 | 0 | if (wait_for_event) { |
1518 | 0 | if (worker->exiting && (worker_num_requests(worker) == 0)) break; |
1519 | | |
1520 | 0 | DEBUG4("Ready to process requests"); |
1521 | 0 | } |
1522 | | |
1523 | | /* |
1524 | | * Check the event list. If there's an error |
1525 | | * (e.g. exit), we stop looping and clean up. |
1526 | | */ |
1527 | 0 | DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait"); |
1528 | 0 | num_events = fr_event_corral(worker->el, fr_time(), wait_for_event); |
1529 | 0 | if (num_events < 0) { |
1530 | 0 | if (fr_event_loop_exiting(worker->el)) { |
1531 | 0 | DEBUG4("Event loop exiting"); |
1532 | 0 | break; |
1533 | 0 | } |
1534 | | |
1535 | 0 | PERROR("Failed retrieving events"); |
1536 | 0 | break; |
1537 | 0 | } |
1538 | | |
1539 | 0 | DEBUG4("%u event(s) pending", num_events); |
1540 | | |
1541 | | /* |
1542 | | * Service outstanding events. |
1543 | | */ |
1544 | 0 | if (num_events > 0) { |
1545 | 0 | DEBUG4("Servicing event(s)"); |
1546 | 0 | fr_event_service(worker->el); |
1547 | 0 | } |
1548 | | |
1549 | | /* |
1550 | | * Run any outstanding requests. |
1551 | | */ |
1552 | 0 | worker_run_request(worker, fr_time()); |
1553 | 0 | } |
1554 | 0 | } |
1555 | | |
1556 | | /** Pre-event handler |
1557 | | * |
1558 | | * This should be run ONLY in single-threaded mode! |
1559 | | */ |
1560 | | int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx) |
1561 | 0 | { |
1562 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1563 | 0 | request_t *request; |
1564 | |
|
1565 | 0 | request = fr_heap_peek(worker->runnable); |
1566 | 0 | if (!request) return 0; |
1567 | | |
1568 | | /* |
1569 | | * There's work to do. Tell the event handler to poll |
1570 | | * for IO / timers, but also immediately return to the |
1571 | | * calling function, which has more work to do. |
1572 | | */ |
1573 | 0 | return 1; |
1574 | 0 | } |
1575 | | |
1576 | | |
1577 | | /** Post-event handler |
1578 | | * |
1579 | | * This should be run ONLY in single-threaded mode! |
1580 | | */ |
1581 | | void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx) |
1582 | 0 | { |
1583 | 0 | fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t); |
1584 | |
|
1585 | 0 | worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */ |
1586 | 0 | } |
1587 | | |
1588 | | /** Print debug information about the worker structure |
1589 | | * |
1590 | | * @param[in] worker the worker |
1591 | | * @param[in] fp the file where the debug output is printed. |
1592 | | */ |
1593 | | void fr_worker_debug(fr_worker_t *worker, FILE *fp) |
1594 | 0 | { |
1595 | 0 | WORKER_VERIFY; |
1596 | |
|
1597 | 0 | fprintf(fp, "\tnum_channels = %d\n", worker->num_channels); |
1598 | 0 | fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in); |
1599 | |
|
1600 | 0 | fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n", |
1601 | 0 | fr_time_delta_unwrap(worker->predicted) * worker->stats.in); |
1602 | 0 | if (worker->stats.in) { |
1603 | 0 | fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n", |
1604 | 0 | fr_time_delta_unwrap(worker->tracking.running_total) / worker->stats.in); |
1605 | 0 | } |
1606 | |
|
1607 | 0 | fr_time_tracking_debug(&worker->tracking, fp); |
1608 | |
|
1609 | 0 | } |
1610 | | |
1611 | | /** Create a channel to the worker |
1612 | | * |
1613 | | * Called by the master (i.e. network) thread when it needs to create |
1614 | | * a new channel to a particuler worker. |
1615 | | * |
1616 | | * @param[in] worker the worker |
1617 | | * @param[in] master the control plane of the master |
1618 | | * @param[in] ctx the context in which the channel will be created |
1619 | | */ |
1620 | | fr_channel_t *fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master) |
1621 | 0 | { |
1622 | 0 | fr_channel_t *ch; |
1623 | 0 | pthread_t id; |
1624 | 0 | bool same; |
1625 | |
|
1626 | 0 | WORKER_VERIFY; |
1627 | |
|
1628 | 0 | id = pthread_self(); |
1629 | 0 | same = (pthread_equal(id, worker->thread_id) != 0); |
1630 | |
|
1631 | 0 | ch = fr_channel_create(ctx, master, worker->control, same); |
1632 | 0 | if (!ch) return NULL; |
1633 | | |
1634 | 0 | fr_channel_set_recv_request(ch, worker, worker_recv_request); |
1635 | | |
1636 | | /* |
1637 | | * Tell the worker about the channel |
1638 | | */ |
1639 | 0 | if (fr_channel_signal_open(ch) < 0) { |
1640 | 0 | talloc_free(ch); |
1641 | 0 | return NULL; |
1642 | 0 | } |
1643 | | |
1644 | 0 | return ch; |
1645 | 0 | } |
1646 | | |
1647 | | int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li) |
1648 | 0 | { |
1649 | 0 | fr_ring_buffer_t *rb; |
1650 | | |
1651 | | /* |
1652 | | * Skip a bunch of work if we're already in the worker thread. |
1653 | | */ |
1654 | 0 | if (is_worker_thread(worker)) { |
1655 | 0 | return fr_worker_listen_cancel_self(worker, li); |
1656 | 0 | } |
1657 | | |
1658 | 0 | rb = fr_worker_rb_init(); |
1659 | 0 | if (!rb) return -1; |
1660 | | |
1661 | 0 | return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN_DEAD, &li, sizeof(li)); |
1662 | 0 | } |
1663 | | |
1664 | | #ifdef WITH_VERIFY_PTR |
1665 | | /** Verify the worker data structures. |
1666 | | * |
1667 | | * @param[in] worker the worker |
1668 | | */ |
1669 | | static void worker_verify(fr_worker_t *worker) |
1670 | 0 | { |
1671 | 0 | int i; |
1672 | |
|
1673 | 0 | (void) talloc_get_type_abort(worker, fr_worker_t); |
1674 | 0 | fr_atomic_queue_verify(worker->aq_control); |
1675 | |
|
1676 | 0 | fr_assert(worker->control != NULL); |
1677 | 0 | (void) talloc_get_type_abort(worker->control, fr_control_t); |
1678 | |
|
1679 | 0 | fr_assert(worker->el != NULL); |
1680 | 0 | (void) talloc_get_type_abort(worker->el, fr_event_list_t); |
1681 | |
|
1682 | 0 | fr_assert(worker->runnable != NULL); |
1683 | 0 | (void) talloc_get_type_abort(worker->runnable, fr_heap_t); |
1684 | |
|
1685 | 0 | fr_assert(worker->dedup != NULL); |
1686 | 0 | (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t); |
1687 | |
|
1688 | 0 | for (i = 0; i < worker->config.max_channels; i++) { |
1689 | 0 | if (!worker->channel[i].ch) continue; |
1690 | | |
1691 | 0 | (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t); |
1692 | 0 | } |
1693 | 0 | } |
1694 | | #endif |
1695 | | |
1696 | | int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats) |
1697 | 0 | { |
1698 | 0 | if (num < 0) return -1; |
1699 | 0 | if (num == 0) return 0; |
1700 | | |
1701 | 0 | stats[0] = worker->stats.in; |
1702 | 0 | if (num >= 2) stats[1] = worker->stats.out; |
1703 | 0 | if (num >= 3) stats[2] = worker->stats.dup; |
1704 | 0 | if (num >= 4) stats[3] = worker->stats.dropped; |
1705 | 0 | if (num >= 5) stats[4] = worker->num_naks; |
1706 | 0 | if (num >= 6) stats[5] = worker->num_active; |
1707 | |
|
1708 | 0 | if (num <= 6) return num; |
1709 | | |
1710 | 0 | return 6; |
1711 | 0 | } |
1712 | | |
1713 | | static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info) |
1714 | 0 | { |
1715 | 0 | fr_worker_t const *worker = ctx; |
1716 | 0 | fr_time_delta_t when; |
1717 | |
|
1718 | 0 | if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) { |
1719 | 0 | fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in); |
1720 | 0 | fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out); |
1721 | 0 | fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup); |
1722 | 0 | fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped); |
1723 | 0 | fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks); |
1724 | 0 | fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active); |
1725 | 0 | fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable)); |
1726 | 0 | } |
1727 | |
|
1728 | 0 | if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) { |
1729 | 0 | when = worker->predicted; |
1730 | 0 | fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC); |
1731 | |
|
1732 | 0 | when = worker->tracking.running_total; |
1733 | 0 | if (fr_time_delta_ispos(when) && (worker->stats.in > worker->stats.dropped)) { |
1734 | 0 | when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped)); |
1735 | 0 | } |
1736 | 0 | fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC); |
1737 | |
|
1738 | 0 | when = worker->tracking.running_total; |
1739 | 0 | fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC); |
1740 | |
|
1741 | 0 | when = worker->tracking.waiting_total; |
1742 | 0 | fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC); |
1743 | |
|
1744 | 0 | fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4); |
1745 | 0 | fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4); |
1746 | 0 | } |
1747 | |
|
1748 | 0 | return 0; |
1749 | 0 | } |
1750 | | |
1751 | | fr_cmd_table_t cmd_worker_table[] = { |
1752 | | { |
1753 | | .parent = "stats", |
1754 | | .name = "worker", |
1755 | | .help = "Statistics for workers threads.", |
1756 | | .read_only = true |
1757 | | }, |
1758 | | |
1759 | | { |
1760 | | .parent = "stats worker", |
1761 | | .add_name = true, |
1762 | | .name = "self", |
1763 | | .syntax = "[(count|cpu)]", |
1764 | | .func = cmd_stats_worker, |
1765 | | .help = "Show statistics for a specific worker thread.", |
1766 | | .read_only = true |
1767 | | }, |
1768 | | |
1769 | | CMD_TABLE_END |
1770 | | }; |