Coverage Report

Created: 2026-06-30 07:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/freeradius-server/src/lib/io/worker.c
Line
Count
Source
1
/*
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the Free Software
14
 *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15
 */
16
17
 /**
18
 * $Id: c4af285a9dcfb80489a3ba2972595170571c8915 $
19
 *
20
 * @brief Worker thread functions.
21
 * @file io/worker.c
22
 *
23
 *  The "worker" thread is the one responsible for the bulk of the
24
 *  work done when processing a request.  Workers are spawned by the
25
 *  scheduler, and create a kqueue (KQ) and control-plane
26
 *  Atomic Queue (AQ) for control-plane communication.
27
 *
28
 *  When a network thread discovers that it needs more workers, it
29
 *  asks the scheduler for a KQ/AQ combination.  The network thread
30
 *  then creates a channel dedicated to that worker, and sends the
31
 *  channel to the worker in a "new channel" message.  The worker
32
 *  receives the channel, and sends an ACK back to the network thread.
33
 *
34
 *  The network thread then sends the worker new packets, which the
35
 *  worker receives and processes.
36
 *
37
 *  When a packet is decoded, it is put into the "runnable" heap, and
38
 *  also into the timeout sublist. The main loop fr_worker() then
39
 *  pulls new requests off of this heap and runs them.  The main event
40
 *  loop checks the head of the timeout sublist, and forcefully terminates
41
 *  any requests which have been running for too long.
42
 *
43
 *  If a request is yielded, it is placed onto the yielded list in
44
 *  the worker "tracking" data structure.
45
 *
46
 * @copyright 2016 Alan DeKok (aland@freeradius.org)
47
 */
48
49
RCSID("$Id: c4af285a9dcfb80489a3ba2972595170571c8915 $")
50
51
0
#define LOG_PREFIX worker->name
52
0
#define LOG_DST worker->log
53
54
#include <freeradius-devel/io/channel.h>
55
#include <freeradius-devel/io/listen.h>
56
#include <freeradius-devel/io/worker.h>
57
#include <freeradius-devel/unlang/base.h>
58
#include <freeradius-devel/util/minmax_heap.h>
59
#include <freeradius-devel/util/timer.h>
60
61
#include <stdalign.h>
62
63
#ifdef WITH_VERIFY_PTR
64
static void worker_verify(fr_worker_t *worker);
65
0
#define WORKER_VERIFY worker_verify(worker)
66
#else
67
#define WORKER_VERIFY
68
#endif
69
70
static _Atomic(uint64_t) request_number = 0;
71
72
FR_SLAB_TYPES(request, request_t)
73
0
FR_SLAB_FUNCS(request, request_t)
Unexecuted instantiation: worker.c:request_slab_release
Unexecuted instantiation: worker.c:request_slab_list_alloc
Unexecuted instantiation: worker.c:request_slab_init
Unexecuted instantiation: worker.c:_request_slab_cleanup
Unexecuted instantiation: worker.c:request_slab_reserve
Unexecuted instantiation: worker.c:request_slab_element_init
74
75
static _Thread_local fr_ring_buffer_t *fr_worker_rb;
76
77
typedef struct {
78
  fr_channel_t    *ch;
79
80
  /*
81
   *  To save time, we don't care about num_elements here.  Which means that we don't
82
   *  need to cache or lookup the fr_worker_listen_t when we free a request.
83
   */
84
  fr_dlist_head_t   dlist;
85
} fr_worker_channel_t;
86
87
/**
88
 *  A worker which takes packets from a master, and processes them.
89
 */
90
struct fr_worker_s {
91
  char const    *name;    //!< name of this worker
92
  fr_worker_config_t  config;   //!< external configuration
93
94
  unlang_interpret_t  *intp;    //!< Worker's local interpreter.
95
96
  pthread_t   thread_id;  //!< my thread ID
97
98
  fr_log_t const    *log;   //!< log destination
99
  fr_log_lvl_t    lvl;    //!< log level
100
101
  fr_atomic_queue_t *aq_control;  //!< atomic queue for control messages sent to me
102
103
  fr_control_t    *control; //!< the control plane
104
105
  fr_event_list_t   *el;    //!< our event list
106
107
  int     num_channels; //!< actual number of channels
108
109
  fr_heap_t         *runnable;  //!< current runnable requests which we've spent time processing
110
111
  fr_timer_list_t   *timeout;   //!< Track when requests timeout using a dlist.
112
  fr_time_delta_t   max_request_time; //!< maximum time a request can be processed
113
114
  fr_rb_tree_t    *dedup;   //!< de-dup tree
115
116
  fr_rb_tree_t    *listeners;     //!< so we can cancel requests when a listener goes away
117
118
  fr_io_stats_t   stats;    //!< input / output stats
119
  fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
120
  fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
121
122
  uint64_t        num_naks; //!< number of messages which were nak'd
123
  uint64_t        num_active; //!< number of active requests
124
125
  fr_time_delta_t   predicted;  //!< How long we predict a request will take to execute.
126
  fr_time_tracking_t  tracking; //!< how much time the worker has spent doing things.
127
128
  bool      was_sleeping; //!< used to suppress multiple sleep signals in a row
129
  bool      exiting;  //!< are we exiting?
130
131
  fr_worker_channel_t *channel; //!< list of channels
132
133
  request_slab_list_t *slab;    //!< slab allocator for request_t
134
};
135
136
typedef struct {
137
  fr_listen_t const *listener;  //!< incoming packets
138
139
  fr_rb_node_t    node;   //!< in tree of listeners
140
141
  /*
142
   *  To save time, we don't care about num_elements here.  Which means that we don't
143
   *  need to cache or lookup the fr_worker_listen_t when we free a request.
144
   */
145
  fr_dlist_head_t   dlist;    //!< of requests associated with this listener.
146
} fr_worker_listen_t;
147
148
149
static int8_t worker_listener_cmp(void const *one, void const *two)
150
0
{
151
0
  fr_worker_listen_t const *a = one, *b = two;
152
153
0
  return CMP(a->listener, b->listener);
154
0
}
155
156
157
/*
158
 *  Explicitly cleanup the memory allocated to the ring buffer,
159
 *  just in case valgrind complains about it.
160
 */
161
static int _fr_worker_rb_free(void *arg)
162
0
{
163
0
  return talloc_free(arg);
164
0
}
165
166
/** Initialise thread local storage
167
 *
168
 * @return fr_ring_buffer_t for messages
169
 */
170
static inline fr_ring_buffer_t *fr_worker_rb_init(void)
171
0
{
172
0
  fr_ring_buffer_t *rb;
173
174
0
  rb = fr_worker_rb;
175
0
  if (rb) return rb;
176
177
0
  rb = fr_ring_buffer_create(NULL, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
178
0
  if (!rb) {
179
0
    fr_perror("Failed allocating memory for worker ring buffer");
180
0
    return NULL;
181
0
  }
182
183
0
  fr_atexit_thread_local(fr_worker_rb, _fr_worker_rb_free, rb);
184
185
0
  return rb;
186
0
}
187
188
static inline bool is_worker_thread(fr_worker_t const *worker)
189
0
{
190
0
  return (pthread_equal(pthread_self(), worker->thread_id) != 0);
191
0
}
192
193
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now);
194
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
195
196
/** Callback which handles a message being received on the worker side.
197
 *
198
 * @param[in] ctx the worker
199
 * @param[in] ch the channel to drain
200
 * @param[in] cd the message (if any) to start with
201
 */
202
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
203
0
{
204
0
  fr_worker_t *worker = ctx;
205
206
0
  worker->stats.in++;
207
0
  DEBUG3("Received request %" PRIu64 "", worker->stats.in);
208
0
  cd->channel.ch = ch;
209
0
  worker_request_bootstrap(worker, cd, fr_time());
210
0
}
211
212
static void worker_requests_cancel(fr_worker_channel_t *ch)
213
0
{
214
0
  request_t *request;
215
216
0
  while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
217
0
    unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
218
0
  }
219
0
}
220
221
static void worker_exit(fr_worker_t *worker)
222
0
{
223
0
  worker->exiting = true;
224
225
  /*
226
   *  Don't allow the post event to run
227
   *  any more requests.  They'll be
228
   *  signalled to stop before we exit.
229
   *
230
   *  This only has an effect in single
231
   *  threaded mode.
232
   */
233
0
  (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
234
0
}
235
236
/** Handle a control plane message sent to the worker via a channel
237
 *
238
 * @param[in] ctx the worker
239
 * @param[in] data  the message
240
 * @param[in] data_size size of the data
241
 * @param[in] now the current time
242
 */
243
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
244
0
{
245
0
  int     i;
246
0
  bool      ok, was_sleeping;
247
0
  fr_channel_t    *ch;
248
0
  fr_message_set_t  *ms;
249
0
  fr_channel_event_t  ce;
250
0
  fr_worker_t   *worker = ctx;
251
252
0
  was_sleeping = worker->was_sleeping;
253
0
  worker->was_sleeping = false;
254
255
  /*
256
   *  We were woken up by a signal to do something.  We're
257
   *  not sleeping.
258
   */
259
0
  ce = fr_channel_service_message(now, &ch, data, data_size);
260
0
  DEBUG3("Channel %s",
261
0
         fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
262
0
  switch (ce) {
263
0
  case FR_CHANNEL_ERROR:
264
0
    return;
265
266
0
  case FR_CHANNEL_EMPTY:
267
0
    return;
268
269
0
  case FR_CHANNEL_NOOP:
270
0
    return;
271
272
0
  case FR_CHANNEL_DATA_READY_REQUESTOR:
273
0
    fr_assert(0 == 1);
274
0
    break;
275
276
0
  case FR_CHANNEL_DATA_READY_RESPONDER:
277
0
    fr_assert(ch != NULL);
278
279
0
    if (!fr_channel_recv_request(ch)) {
280
0
      worker->was_sleeping = was_sleeping;
281
282
0
    } else while (fr_channel_recv_request(ch));
283
0
    break;
284
285
0
  case FR_CHANNEL_OPEN:
286
0
    fr_assert(ch != NULL);
287
288
0
    ok = false;
289
0
    for (i = 0; i < worker->config.max_channels; i++) {
290
0
      fr_assert(worker->channel[i].ch != ch);
291
292
0
      if (worker->channel[i].ch != NULL) continue;
293
294
0
      worker->channel[i].ch = ch;
295
0
      fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
296
297
0
      DEBUG3("Received channel %p into array entry %d", ch, i);
298
299
0
      ms = fr_message_set_create(worker, worker->config.message_set_size,
300
0
               sizeof(fr_channel_data_t),
301
0
               worker->config.ring_buffer_size, false);
302
0
      fr_assert(ms != NULL);
303
0
      fr_channel_responder_uctx_add(ch, ms);
304
305
0
      worker->num_channels++;
306
0
      ok = true;
307
0
      break;
308
0
    }
309
310
0
    fr_cond_assert(ok);
311
0
    break;
312
313
0
  case FR_CHANNEL_CLOSE:
314
0
    fr_assert(ch != NULL);
315
316
0
    ok = false;
317
318
    /*
319
     *  Locate the signalling channel in the list
320
     *  of channels.
321
     */
322
0
    for (i = 0; i < worker->config.max_channels; i++) {
323
0
      if (!worker->channel[i].ch) continue;
324
325
0
      if (worker->channel[i].ch != ch) continue;
326
327
0
      worker_requests_cancel(&worker->channel[i]);
328
329
0
      ms = fr_channel_responder_uctx_get(ch);
330
331
0
      fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0,
332
0
              "Network added messages to channel after sending FR_CHANNEL_CLOSE");
333
334
0
      fr_channel_responder_ack_close(ch);
335
0
      fr_assert(ms != NULL);
336
0
      fr_message_set_gc(ms);
337
0
      talloc_free(ms);
338
339
0
      worker->channel[i].ch = NULL;
340
341
0
      fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
342
0
      fr_assert(worker->num_channels > 0);
343
344
0
      worker->num_channels--;
345
0
      ok = true;
346
0
      break;
347
0
    }
348
349
0
    fr_cond_assert(ok);
350
351
    /*
352
     *  Our last input channel closed,
353
     *  time to die.
354
     */
355
0
    if (worker->num_channels == 0) worker_exit(worker);
356
0
    break;
357
0
  }
358
0
}
359
360
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
361
0
{
362
0
  fr_worker_listen_t *wl;
363
0
  request_t *request;
364
365
0
  wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
366
0
  if (!wl) return -1;
367
368
0
  while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
369
0
    RERROR("Cancelling request due to socket being closed");
370
0
    unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
371
0
  }
372
373
0
  (void) fr_rb_delete(worker->listeners, wl);
374
0
  talloc_free(wl);
375
376
0
  return 0;
377
0
}
378
379
380
/** A socket is going away, so clean up any requests which use this socket.
381
 *
382
 * @param[in] ctx the worker
383
 * @param[in] data  the message
384
 * @param[in] data_size size of the data
385
 * @param[in] now the current time
386
 */
387
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
388
0
{
389
0
  fr_listen_t const *li;
390
0
  fr_worker_t   *worker = ctx;
391
392
0
  fr_assert(data_size == sizeof(li));
393
394
0
  memcpy(&li, data, sizeof(li));
395
396
0
  (void) fr_worker_listen_cancel_self(worker, li);
397
0
}
398
399
/** Send a NAK to the network thread
400
 *
401
 * The network thread believes that a worker is running a request until that request has been NAK'd.
402
 * We typically NAK requests when they've been hanging around in the worker's backlog too long,
403
 * or there was an error executing the request.
404
 *
405
 * @param[in] worker  the worker
406
 * @param[in] cd  the message to NAK
407
 * @param[in] now when the message is NAKd
408
 */
409
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
410
0
{
411
0
  size_t      size;
412
0
  fr_channel_data_t *reply;
413
0
  fr_channel_t    *ch;
414
0
  fr_message_set_t  *ms;
415
0
  fr_listen_t   *listen;
416
417
0
  worker->num_naks++;
418
419
  /*
420
   *  Cache the outbound channel.  We'll need it later.
421
   */
422
0
  ch = cd->channel.ch;
423
0
  listen = cd->listen;
424
425
  /*
426
   *  If the channel has been closed, but we haven't
427
   *  been informed, that is extremely bad.
428
   *
429
   *  Try to continue working... but we'll likely
430
   *  leak memory or SEGV soon.
431
   */
432
0
  if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
433
0
    fr_message_done(&cd->m);
434
0
    return;
435
0
  }
436
437
0
  ms = fr_channel_responder_uctx_get(ch);
438
0
  fr_assert(ms != NULL);
439
440
0
  size = listen->app_io->default_reply_size;
441
0
  if (!size) size = listen->app_io->default_message_size;
442
443
  /*
444
   *  Allocate a default message size.
445
   */
446
0
  MEM(reply = (fr_channel_data_t *) fr_message_and_data_reserve(ms, size));
447
448
  /*
449
   *  Encode a NAK
450
   */
451
0
  if (listen->app_io->nak) {
452
0
    size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
453
0
             cd->m.data_size, reply->m.data, reply->m.rb_size);
454
0
  } else {
455
0
    size = 1; /* rely on them to figure it the heck out */
456
0
  }
457
458
0
  (void) fr_message_and_data_commit(ms, &reply->m, size);
459
460
  /*
461
   *  Fill in the NAK.
462
   */
463
0
  reply->m.when = now;
464
0
  reply->reply.cpu_time = worker->tracking.running_total;
465
0
  reply->reply.processing_time = fr_time_delta_from_msec(1); /* @todo - set to something better? */
466
0
  reply->reply.request_time = cd->request.recv_time;
467
468
0
  reply->listen = cd->listen;
469
0
  reply->packet_ctx = cd->packet_ctx;
470
471
  /*
472
   *  Mark the original message as done.
473
   */
474
0
  fr_message_done(&cd->m);
475
476
  /*
477
   *  Send the reply, which also polls the request queue.
478
   */
479
0
  if (fr_channel_send_reply(ch, reply) < 0) {
480
0
    DEBUG2("Failed sending reply to channel");
481
0
  }
482
483
0
  worker->stats.out++;
484
0
}
485
486
/** Signal the unlang interpreter that it needs to stop running the request
487
 *
488
 * Signalling is a synchronous operation.  Whatever I/O requests the request
489
 * is currently performing are immediately cancelled, and all the frames are
490
 * popped off the unlang stack.
491
 *
492
 * Modules and unlang keywords explicitly register signal handlers to deal
493
 * with their yield points being cancelled/interrupted via this function.
494
 *
495
 * The caller should assume the request is no longer viable after calling
496
 * this function.
497
 *
498
 * @param[in] request request to cancel.  The request may still run to completion.
499
 */
500
static void worker_stop_request(request_t *request)
501
0
{
502
  /*
503
   *  Also marks the request as done and runs
504
   *  the internal/external callbacs.
505
   */
506
0
  unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
507
0
}
508
509
/** Enforce max_request_time
510
 *
511
 * Run periodically, and tries to clean up requests which were received by the network
512
 * thread more than max_request_time seconds ago.  In the interest of not adding a
513
 * timer for every packet, the requests are given a 1 second leeway.
514
 *
515
 * @param[in] tl  the worker's timer list.
516
 * @param[in] when  the current time
517
 * @param[in] uctx  the request_t timing out.
518
 */
519
static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
520
0
{
521
0
  request_t *request = talloc_get_type_abort(uctx, request_t);
522
523
  /*
524
   *  Waiting too long, delete it.
525
   */
526
0
  REDEBUG("Request has reached max_request_time - signalling it to stop");
527
0
  worker_stop_request(request);
528
529
  /*
530
   *  This ensures the finally section can run timeout specific policies
531
   */
532
0
  request->rcode = RLM_MODULE_TIMEOUT;
533
0
}
534
535
536
/** Start time tracking for a request, and mark it as runnable.
537
 *
538
 */
539
static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
540
0
{
541
  /*
542
   *  New requests are inserted into the time order heap in
543
   *  strict time priority.  Once they are in the list, they
544
   *  are only removed when the request is done / free'd.
545
   */
546
0
  fr_assert(!fr_timer_armed(request->timeout));
547
548
0
  if (unlikely(fr_timer_in(request, worker->timeout, &request->timeout, worker->config.max_request_time,
549
0
         true, _worker_request_timeout, request) < 0)) {
550
0
    RERROR("Failed to set request timeout timer");
551
0
    return -1;
552
0
  }
553
554
  /*
555
   *  Bootstrap the async state machine with the initial
556
   *  state of the request.
557
   */
558
0
  RDEBUG3("Time tracking started in yielded state");
559
0
  fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
560
0
  fr_time_tracking_yield(&request->async->tracking, now);
561
0
  worker->num_active++;
562
563
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
564
0
  (void) fr_heap_insert(&worker->runnable, request);
565
566
0
  return 0;
567
0
}
568
569
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
570
0
{
571
0
  RDEBUG3("Time tracking ended");
572
0
  fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
573
0
  fr_assert(worker->num_active > 0);
574
0
  worker->num_active--;
575
576
0
  TALLOC_FREE(request->timeout);  /* Disarm the reques timer */
577
0
}
578
579
/** Send a response packet to the network side
580
 *
581
 * @param[in] worker    This worker.
582
 * @param[in] request   we're sending a reply for.
583
 * @param[in] send_reply  whether the network side sends a reply
584
 * @param[in] now   The current time
585
 */
586
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
587
0
{
588
0
  fr_channel_data_t *reply;
589
0
  fr_channel_t *ch;
590
0
  fr_message_set_t *ms;
591
0
  size_t size = 1;
592
593
0
  REQUEST_VERIFY(request);
594
595
  /*
596
   *  If we're sending a reply, then it's no longer runnable.
597
   */
598
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
599
600
0
  if (send_reply) {
601
0
    size = request->async->listen->app_io->default_reply_size;
602
0
    if (!size) size = request->async->listen->app_io->default_message_size;
603
0
  }
604
605
  /*
606
   *  Allocate and send the reply.
607
   */
608
0
  ch = request->async->channel;
609
0
  fr_assert(ch != NULL);
610
611
  /*
612
   *  If the channel has been closed, but we haven't
613
   *  been informed, that is extremely bad.
614
   *
615
   *  Try to continue working... but we'll likely
616
   *  leak memory or SEGV soon.
617
   */
618
0
  if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
619
0
    return;
620
0
  }
621
622
0
  ms = fr_channel_responder_uctx_get(ch);
623
0
  fr_assert(ms != NULL);
624
625
0
  reply = (fr_channel_data_t *) fr_message_and_data_reserve(ms, size);
626
0
  fr_assert(reply != NULL);
627
628
  /*
629
   *  Encode it, if required.
630
   */
631
0
  if (send_reply) {
632
0
    ssize_t slen = 0;
633
0
    fr_listen_t const *listen = request->async->listen;
634
635
0
    if (listen->app_io->encode) {
636
0
      slen = listen->app_io->encode(listen->app_io_instance, request,
637
0
                  reply->m.data, reply->m.rb_size);
638
0
    } else if (listen->app->encode) {
639
0
      slen = listen->app->encode(listen->app_instance, request,
640
0
               reply->m.data, reply->m.rb_size);
641
0
    }
642
0
    if (slen < 0) {
643
0
      RPERROR("Failed encoding request");
644
0
      *reply->m.data = 0;
645
0
      slen = 1;
646
0
    }
647
648
    /*
649
     *  Shrink the buffer to the actual packet size.
650
     *
651
     *  This will ALWAYS return the same message as we put in.
652
     */
653
0
    fr_assert((size_t) slen <= reply->m.rb_size);
654
0
    (void) fr_message_and_data_commit(ms, &reply->m, slen);
655
0
  }
656
657
  /*
658
   *  Fill in the rest of the fields in the channel message.
659
   *
660
   *  sequence / ack will be filled in by fr_channel_send_reply()
661
   */
662
0
  reply->m.when = now;
663
0
  reply->reply.cpu_time = worker->tracking.running_total;
664
0
  reply->reply.processing_time = request->async->tracking.running_total;
665
0
  reply->reply.request_time = request->async->recv_time;
666
667
0
  reply->listen = request->async->listen;
668
0
  reply->packet_ctx = request->async->packet_ctx;
669
670
  /*
671
   *  Update the various timers.
672
   */
673
0
  fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
674
0
  fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
675
676
0
  RDEBUG("Finished request");
677
678
  /*
679
   *  Send the reply, which also polls the request queue.
680
   */
681
0
  if (fr_channel_send_reply(ch, reply) < 0) {
682
    /*
683
     *  Should only happen if the TO_REQUESTOR
684
     *  channel is full, or it's not yet active.
685
     *
686
     *  Not much we can do except complain
687
     *  loudly and cleanup the request.
688
     */
689
0
    RPERROR("Failed sending reply to network thread");
690
0
  }
691
692
0
  worker->stats.out++;
693
694
0
  fr_assert(!fr_timer_armed(request->timeout));
695
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
696
697
0
  fr_dlist_entry_unlink(&request->listen_entry);
698
699
0
#ifndef NDEBUG
700
0
  request->async->el = NULL;
701
0
  request->async->channel = NULL;
702
0
  request->async->packet_ctx = NULL;
703
0
  request->async->listen = NULL;
704
0
#endif
705
0
}
706
707
/*
708
 *  talloc_typed_asprintf() is horrifically slow for printing
709
 *  simple numbers.
710
 */
711
static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
712
0
{
713
0
  char buffer[32];
714
0
  char *p;
715
0
  char const *numbers = "0123456789";
716
717
0
  p = buffer + 30;
718
0
  *(p--) = '\0';
719
720
0
  while (number > 0) {
721
0
    *(p--) = numbers[number % 10];
722
0
    number /= 10;
723
0
  }
724
725
0
  if (p[1]) return talloc_strdup(ctx, p + 1);
726
727
0
  return talloc_strdup(ctx, "0");
728
0
}
729
730
/** Initialize various request fields needed by the worker.
731
 *
732
 */
733
static inline CC_HINT(always_inline)
734
void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
735
{
736
  /*
737
   *  For internal requests request->packet
738
   *  and request->reply are already populated.
739
   */
740
  if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
741
  if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
742
743
  request->packet->timestamp = now;
744
  request->async = talloc_zero(request, fr_async_t);
745
  request->async->recv_time = now;
746
  request->async->el = worker->el;
747
  fr_dlist_entry_init(&request->async->entry);
748
}
749
750
static inline CC_HINT(always_inline)
751
void worker_request_name_number(request_t *request)
752
0
{
753
0
  request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst);
754
0
  if (request->name) talloc_const_free(request->name);
755
0
  request->name = itoa_internal(request, request->number);
756
0
}
757
758
static inline CC_HINT(always_inline)
759
uint32_t worker_num_requests(fr_worker_t *worker)
760
0
{
761
0
  return fr_timer_list_num_events(worker->timeout);
762
0
}
763
764
static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
765
0
{
766
0
  return request_slab_deinit(request);
767
0
}
768
769
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
770
{
771
  int     ret = -1;
772
  request_t   *request;
773
  fr_listen_t   *listen = cd->listen;
774
775
  if (worker_num_requests(worker) >= (uint32_t) worker->config.max_requests) {
776
    RATE_LIMIT_GLOBAL(ERROR, "Worker at max requests");
777
    goto nak;
778
  }
779
780
  /*
781
   *  Receive a message to the worker queue, and decode it
782
   *  to a request.
783
   */
784
  fr_assert(listen != NULL);
785
786
  request = request_slab_reserve(worker->slab);
787
  if (!request) {
788
    RATE_LIMIT_GLOBAL(ERROR, "Worker failed allocating new request");
789
    goto nak;
790
  }
791
  /*
792
   *  Ensures that both the deinit function runs AND
793
   *  the request is returned to the slab if something
794
   *  calls talloc_free() on it.
795
   */
796
  request_slab_element_set_destructor(request, _worker_request_deinit, worker);
797
798
  /*
799
   *  Have to initialise the request manually because namspace
800
   *  changes based on the listener that allocated it.
801
   */
802
  if (request_init(request, REQUEST_TYPE_EXTERNAL, (&(request_init_args_t){ .namespace = listen->dict })) < 0) {
803
    request_slab_release(request);
804
    goto nak;
805
  }
806
807
  /*
808
   *  Do normal worker init that's shared between internal
809
   *  and external requests.
810
   */
811
  worker_request_init(worker, request, now);
812
  worker_request_name_number(request);
813
814
  /*
815
   *  Associate our interpreter with the request
816
   */
817
  unlang_interpret_set(request, worker->intp);
818
819
  request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
820
821
  /*
822
   *  Update the transport-specific fields.
823
   */
824
  request->async->channel = cd->channel.ch;
825
826
  request->async->recv_time = cd->request.recv_time;
827
828
  request->async->listen = listen;
829
  request->async->packet_ctx = cd->packet_ctx;
830
  request->priority = cd->priority;
831
832
  /*
833
   *  Now that the "request" structure has been initialized, go decode the packet.
834
   *
835
   *  Note that this also sets the "async process" function.
836
   */
837
  if (listen->app->decode) {
838
    ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
839
  } else if (listen->app_io->decode) {
840
    ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
841
  }
842
843
  if (ret < 0) {
844
  fail:
845
    fr_assert(talloc_parent(request->stack) == request);
846
    request_slab_release(request);
847
848
  nak:
849
    worker_nak(worker, cd, now);
850
    return;
851
  }
852
853
  /*
854
   *  Set the entry point for this virtual server.
855
   */
856
  if (unlang_call_push(NULL, request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
857
    RERROR("Protocol failed to set 'process' function");
858
    goto fail;
859
  }
860
861
  /*
862
   *  Look for conflicting / duplicate packets, but only if
863
   *  requested to do so.
864
   */
865
  if (request->async->listen->track_duplicates) {
866
    request_t *old;
867
868
    old = fr_rb_find(worker->dedup, request);
869
    if (!old) {
870
      goto insert_new;
871
    }
872
873
    fr_assert(old->async->listen == request->async->listen);
874
    fr_assert(old->async->channel == request->async->channel);
875
876
    /*
877
     *  There's a new packet.  Do we keep the old one,
878
     *  or the new one?  This decision is made by
879
     *  checking the recv_time, which is a
880
     *  nanosecond-resolution timer.  If the time is
881
     *  identical, then the new packet is the same as
882
     *  the old one.
883
     *
884
     *  If the new packet is a duplicate of the old
885
     *  one, then we can just discard the new one.  We
886
     *  have to tell the channel that we've "eaten"
887
     *  this reply, so the sequence number should
888
     *  increase.
889
     *
890
     *  @todo - fix the channel code to do queue
891
     *  depth, and not sequence / ack.
892
     */
893
    if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
894
      RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
895
896
      fr_channel_null_reply(request->async->channel);
897
      request_slab_release(request);
898
899
      /*
900
       *  Signal there's a dup, and ignore the
901
       *  return code.  We don't bother replying
902
       *  here, as an FD event or timer will
903
       *  wake up the request, and cause it to
904
       *  continue.
905
       *
906
       *  @todo - the old request is NOT
907
       *  running, but is yielded.  It MAY clean
908
       *  itself up, or do something...
909
       */
910
      unlang_interpret_signal(old, FR_SIGNAL_DUP);
911
      worker->stats.dup++;
912
913
      fr_message_done(&cd->m);
914
      return;
915
    }
916
917
    /*
918
     *  Stop the old request, and decrement the number
919
     *  of active requests.
920
     */
921
    RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
922
923
    worker_stop_request(old);
924
    worker->stats.dropped++;
925
    (void) fr_rb_remove(worker->dedup, old); /* remove, but do NOT free it */
926
927
  insert_new:
928
    (void) fr_rb_insert(worker->dedup, request);
929
  }
930
931
  if (worker_request_time_tracking_start(worker, request, now) < 0) {
932
    if (request->async->listen->track_duplicates) (void) fr_rb_remove(worker->dedup, request);
933
    goto fail;
934
  }
935
936
  /*
937
   *  We're done with this message.
938
   */
939
  fr_message_done(&cd->m);
940
941
  {
942
    fr_worker_listen_t *wl;
943
944
    wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
945
    if (!wl) {
946
      MEM(wl = talloc_zero(worker, fr_worker_listen_t));
947
      fr_dlist_init(&wl->dlist, request_t, listen_entry);
948
      wl->listener = listen;
949
950
      (void) fr_rb_insert(worker->listeners, wl);
951
    }
952
953
    fr_dlist_insert_tail(&wl->dlist, request);
954
  }
955
}
956
957
/**
958
 *  Track a request_t in the "runnable" heap.
959
 *  Higher priorities take precedence, followed by lower sequence numbers
960
 */
961
static int8_t worker_runnable_cmp(void const *one, void const *two)
962
0
{
963
0
  request_t const *a = one, *b = two;
964
0
  int ret;
965
966
  /*
967
   *  Prefer higher priority packets.
968
   */
969
0
  ret = CMP_PREFER_LARGER(b->priority, a->priority);
970
0
  if (ret != 0) return ret;
971
972
  /*
973
   *  Prefer packets which are further along in their processing sequence.
974
   */
975
0
  ret = CMP_PREFER_LARGER(a->sequence, b->sequence);
976
0
  if (ret != 0) return ret;
977
978
  /*
979
   *  Smaller timestamp (i.e. earlier) is more important.
980
   */
981
0
  return fr_time_cmp(a->async->recv_time, b->async->recv_time);
982
0
}
983
984
/**
985
 *  Track a request_t in the "dedup" tree
986
 */
987
static int8_t worker_dedup_cmp(void const *one, void const *two)
988
0
{
989
0
  int ret;
990
0
  request_t const *a = one, *b = two;
991
992
0
  ret = CMP(a->async->listen, b->async->listen);
993
0
  if (ret) return ret;
994
995
0
  return CMP(a->async->packet_ctx, b->async->packet_ctx);
996
0
}
997
998
/** Destroy a worker
999
 *
1000
 * The input channels are signaled, and local messages are cleaned up.
1001
 *
1002
 * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1003
 * error has occurred on the worker side, and we need to destroy it.
1004
 *
1005
 * We signal all pending requests in the backlog to stop, and tell the
1006
 * network side that it should not send us any more requests.
1007
 *
1008
 * @param[in] worker the worker to destroy.
1009
 */
1010
void fr_worker_destroy(fr_worker_t *worker)
1011
0
{
1012
0
  int i, count, ret;
1013
1014
//  WORKER_VERIFY;
1015
1016
  /*
1017
   *  Stop any new requests running with this interpreter
1018
   */
1019
0
  unlang_interpret_set_thread_default(NULL);
1020
1021
  /*
1022
   *  Destroy all of the active requests.  These are ones
1023
   *  which are still waiting for timers or file descriptor
1024
   *  events.
1025
   */
1026
0
  count = 0;
1027
1028
  /*
1029
   *  Force the timeout event to fire for all requests that
1030
   *  are still running.
1031
   */
1032
0
  ret = fr_timer_list_force_run(worker->timeout);
1033
0
  if (unlikely(ret < 0)) {
1034
0
    fr_assert_msg(0, "Failed to force run the timeout list");
1035
0
  } else {
1036
0
    count += ret;
1037
0
  }
1038
1039
0
  fr_assert(fr_heap_num_elements(worker->runnable) == 0);
1040
1041
0
  DEBUG("Worker is exiting - stopped %u requests", count);
1042
1043
  /*
1044
   *  Signal the channels that we're closing.
1045
   *
1046
   *  The other end owns the channel, and will take care of
1047
   *  popping messages in the TO_RESPONDER queue, and marking
1048
   *  them FR_MESSAGE_DONE.  It will ignore the messages in
1049
   *  the TO_REQUESTOR queue, as we own those.  They will be
1050
   *  automatically freed when our talloc context is freed.
1051
   */
1052
0
  for (i = 0; i < worker->config.max_channels; i++) {
1053
0
    if (!worker->channel[i].ch) continue;
1054
1055
0
    worker_requests_cancel(&worker->channel[i]);
1056
1057
0
    fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0,
1058
0
            "Pending messages in channel after cancelling request");
1059
1060
0
    fr_channel_responder_ack_close(worker->channel[i].ch);
1061
0
  }
1062
1063
0
  talloc_free(worker);
1064
0
}
1065
1066
/** Internal request (i.e. one generated by the interpreter) is now complete
1067
 *
1068
 */
1069
static void _worker_request_internal_init(request_t *request, void *uctx)
1070
0
{
1071
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1072
0
  fr_time_t now = fr_time();
1073
1074
0
  worker_request_init(worker, request, now);
1075
1076
  /*
1077
   *  Requests generated by the interpreter
1078
   *  are always marked up as internal.
1079
   */
1080
0
  fr_assert(request_is_internal(request));
1081
0
  if (worker_request_time_tracking_start(worker, request, now) < 0) {
1082
0
    unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
1083
0
  }
1084
0
}
1085
1086
1087
/** External request is now complete
1088
 *
1089
 */
1090
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1091
0
{
1092
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1093
0
  fr_time_t   now = fr_time();
1094
1095
  /*
1096
   *  All external requests MUST have a listener.
1097
   */
1098
0
  fr_assert(request_is_external(request));
1099
0
  fr_assert(request->async->listen != NULL);
1100
1101
  /*
1102
   *  Only real packets are in the dedup tree.  And even
1103
   *  then, only some of the time.
1104
   */
1105
0
  if (request->async->listen->track_duplicates && fr_rb_node_inline_in_tree(&request->dedup_node)) {
1106
0
    (void) fr_rb_delete(worker->dedup, request);
1107
0
  }
1108
1109
  /*
1110
   *  If we're running a real request, then the final
1111
   *  indentation MUST be zero.  Otherwise we skipped
1112
   *  something!
1113
   *
1114
   *  Also check that the request is NOT marked as
1115
   *  "yielded", but is in fact done.
1116
   *
1117
   *  @todo - check that the stack is at frame 0, otherwise
1118
   *  more things have gone wrong.
1119
   */
1120
0
  fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1121
0
          "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1122
0
  fr_assert_msg(!unlang_interpret_is_resumable(request),
1123
0
          "Request %s is marked as yielded at end of processing", request->name);
1124
0
  fr_assert_msg(unlang_interpret_stack_depth(request) == 0,
1125
0
          "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1126
0
  RDEBUG("Done request");
1127
1128
  /*
1129
   *  The request is done.  Track that.
1130
   */
1131
0
  worker_request_time_tracking_end(worker, request, now);
1132
1133
  /*
1134
   *  Remove it from the list of requests associated with this channel.
1135
   */
1136
0
  if (fr_dlist_entry_in_list(&request->async->entry)) {
1137
0
    fr_dlist_entry_unlink(&request->async->entry);
1138
0
  }
1139
1140
  /*
1141
   *  These conditions are true when the server is
1142
   *  exiting and we're stopping all the requests.
1143
   *
1144
   *  This should never happen otherwise.
1145
   */
1146
0
  if (unlikely(!fr_channel_active(request->async->channel))) {
1147
0
    fr_dlist_entry_unlink(&request->listen_entry);
1148
0
    request_slab_release(request);
1149
0
    return;
1150
0
  }
1151
1152
0
  worker_send_reply(worker, request, !unlang_request_is_cancelled(request), now);
1153
0
  request_slab_release(request);
1154
0
}
1155
1156
/** Internal request (i.e. one generated by the interpreter) is now complete
1157
 *
1158
 * Whatever generated the request is now responsible for freeing it.
1159
 */
1160
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1161
0
{
1162
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1163
1164
0
  worker_request_time_tracking_end(worker, request, fr_time());
1165
1166
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
1167
0
  fr_assert(!fr_timer_armed(request->timeout));
1168
0
  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1169
0
}
1170
1171
/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1172
 *
1173
 * As the request has no parent, then there's nothing to free it
1174
 * so we have to.
1175
 */
1176
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
1177
0
{
1178
  /*
1179
   *  No time tracking for detached requests
1180
   *  so we don't need to call
1181
   *  worker_request_time_tracking_end.
1182
   */
1183
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
1184
1185
  /*
1186
   *  Normally worker_request_time_tracking_end
1187
   *  would remove the request from the time
1188
   *  order heap, but we need to do that for
1189
   *  detached requests.
1190
   */
1191
0
  TALLOC_FREE(request->timeout);
1192
1193
0
  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1194
1195
  /*
1196
   *  Detached requests have to be freed by us
1197
   *  as nothing else can free them.
1198
   *
1199
   *  All other requests must be freed by the
1200
   *  code which allocated them.
1201
   */
1202
0
  talloc_free(request);
1203
0
}
1204
1205
1206
/** Make us responsible for running the request
1207
 *
1208
 */
1209
static void _worker_request_detach(request_t *request, void *uctx)
1210
0
{
1211
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1212
0
  fr_time_t now = fr_time();
1213
1214
0
  RDEBUG4("%s - Request detaching", __FUNCTION__);
1215
1216
0
  if (request_is_detachable(request)) {
1217
    /*
1218
    * End the time tracking...  We don't track detached requests,
1219
    * because they don't contribute for the time consumed by an
1220
    * external request.
1221
    */
1222
0
    if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1223
0
      RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1224
0
      fr_time_tracking_resume(&request->async->tracking, now);
1225
0
    }
1226
0
    worker_request_time_tracking_end(worker, request, now);
1227
1228
0
    if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1229
1230
0
    RDEBUG3("Request is detached");
1231
0
  } else {
1232
0
    fr_assert_msg(0, "Request is not detachable");
1233
0
  }
1234
1235
0
  return;
1236
0
}
1237
1238
/** Request is now runnable
1239
 *
1240
 */
1241
static void _worker_request_runnable(request_t *request, void *uctx)
1242
0
{
1243
0
  fr_worker_t *worker = uctx;
1244
1245
0
  RDEBUG4("%s - Request marked as runnable", __FUNCTION__);
1246
0
  fr_heap_insert(&worker->runnable, request);
1247
0
}
1248
1249
/** Interpreter yielded request
1250
 *
1251
 */
1252
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1253
0
{
1254
0
  RDEBUG4("%s - Request yielded", __FUNCTION__);
1255
0
  if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
1256
0
}
1257
1258
/** Interpreter is starting to work on request again
1259
 *
1260
 */
1261
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1262
0
{
1263
0
  RDEBUG4("%s - Request resuming", __FUNCTION__);
1264
0
  if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
1265
0
}
1266
1267
/** Check if a request is scheduled
1268
 *
1269
 */
1270
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1271
0
{
1272
0
  return fr_heap_entry_inserted(request->runnable);
1273
0
}
1274
1275
/** Update a request's priority
1276
 *
1277
 */
1278
static void _worker_request_prioritise(request_t *request, void *uctx)
1279
0
{
1280
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1281
1282
0
  RDEBUG4("%s - Request priority changed", __FUNCTION__);
1283
1284
  /* Extract the request from the runnable queue _if_ it's in the runnable queue */
1285
0
  if (fr_heap_extract(&worker->runnable, request) < 0) return;
1286
1287
  /* Reinsert it to re-evaluate its new priority */
1288
0
  fr_heap_insert(&worker->runnable, request);
1289
0
}
1290
1291
/** Run a request
1292
 *
1293
 *  Until it either yields, or is done.
1294
 *
1295
 *  This function is also responsible for sending replies, and
1296
 *  cleaning up the request.
1297
 *
1298
 * @param[in] worker the worker
1299
 * @param[in] start the current time
1300
 */
1301
static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1302
0
{
1303
0
  request_t *request;
1304
0
  fr_time_t now;
1305
1306
0
  WORKER_VERIFY;
1307
1308
0
  now = start;
1309
1310
  /*
1311
   *  Busy-loop running requests for 1ms.  We still poll the
1312
   *  event loop 1000 times a second, OR when there's no
1313
   *  more work to do.  This allows us to make progress with
1314
   *  ongoing requests, at the expense of sometimes ignoring
1315
   *  new ones.
1316
   */
1317
0
  while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1318
0
         ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1319
1320
0
    REQUEST_VERIFY(request);
1321
0
    fr_assert(!fr_heap_entry_inserted(request->runnable));
1322
1323
    /*
1324
     *  For real requests, if the channel is gone,
1325
     *  just stop the request and free it.
1326
     */
1327
0
    if (request->async->channel && !fr_channel_active(request->async->channel)) {
1328
0
      worker_stop_request(request);
1329
0
      continue;
1330
0
    }
1331
1332
0
    (void)unlang_interpret(request, UNLANG_REQUEST_RESUME);
1333
1334
0
    now = fr_time();
1335
0
  }
1336
0
}
1337
1338
/** Create a worker
1339
 *
1340
 * @param[in] ctx the talloc context
1341
 * @param[in] name the name of this worker
1342
 * @param[in] el the event list
1343
 * @param[in] logger the destination for all logging messages
1344
 * @param[in] lvl log level
1345
 * @param[in] config various configuration parameters
1346
 * @return
1347
 *  - NULL on error
1348
 *  - fr_worker_t on success
1349
 */
1350
fr_worker_t *fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1351
           fr_worker_config_t *config)
1352
0
{
1353
0
  fr_worker_t *worker;
1354
1355
0
  worker = talloc_zero(ctx, fr_worker_t);
1356
0
  if (!worker) {
1357
0
nomem:
1358
0
    fr_strerror_const("Failed allocating memory");
1359
0
    return NULL;
1360
0
  }
1361
1362
0
  worker->name = talloc_strdup(worker, name); /* thread locality */
1363
1364
0
  if (config) worker->config = *config;
1365
1366
0
#define CHECK_CONFIG(_x, _min, _max) do { \
1367
0
    if (!worker->config._x) worker->config._x = _min; \
1368
0
    if (worker->config._x < _min) worker->config._x = _min; \
1369
0
    if (worker->config._x > _max) worker->config._x = _max; \
1370
0
       } while (0)
1371
1372
0
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1373
0
    if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1374
0
    if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1375
0
       } while (0)
1376
1377
0
  CHECK_CONFIG(max_requests,1024,(1 << 30));
1378
0
  CHECK_CONFIG(max_channels, 64, 1024);
1379
0
  CHECK_CONFIG(reuse.child_pool_size, 4096, 65536);
1380
0
  CHECK_CONFIG(message_set_size, 1024, 8192);
1381
0
  CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1382
0
  CHECK_CONFIG_TIME_DELTA(max_request_time, fr_time_delta_from_sec(5), fr_time_delta_from_sec(120));
1383
1384
0
  worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1385
0
  if (!worker->channel) {
1386
0
    talloc_free(worker);
1387
0
    goto nomem;
1388
0
  }
1389
1390
0
  worker->thread_id = pthread_self();
1391
0
  worker->el = el;
1392
0
  worker->log = logger;
1393
0
  worker->lvl = lvl;
1394
1395
  /*
1396
   *  The worker thread starts now.  Manually initialize it,
1397
   *  because we're tracking request time, not the time that
1398
   *  the worker thread is running.
1399
   */
1400
0
  memset(&worker->tracking, 0, sizeof(worker->tracking));
1401
1402
0
  worker->aq_control = fr_atomic_queue_talloc(worker, 1024);
1403
0
  if (!worker->aq_control) {
1404
0
    fr_strerror_const("Failed creating atomic queue");
1405
0
  fail:
1406
0
    talloc_free(worker);
1407
0
    return NULL;
1408
0
  }
1409
1410
0
  worker->control = fr_control_create(worker, el, worker->aq_control, 7);
1411
0
  if (!worker->control) {
1412
0
    fr_strerror_const_push("Failed creating control plane");
1413
0
    goto fail;
1414
0
  }
1415
1416
0
  if (fr_control_callback_add(&worker->control, FR_CONTROL_ID_CHANNEL, worker, worker_channel_callback) < 0) {
1417
0
    fr_strerror_const_push("Failed adding control channel");
1418
0
    goto fail;
1419
0
  }
1420
1421
0
  if (fr_control_callback_add(&worker->control, FR_CONTROL_ID_LISTEN_DEAD, worker, worker_listen_cancel_callback) < 0) {
1422
0
    fr_strerror_const_push("Failed adding callback for listeners");
1423
0
    goto fail;
1424
0
  }
1425
1426
0
  if (fr_control_open(worker->control) < 0) {
1427
0
    fr_strerror_const_push("Failed opening control plane");
1428
0
    goto fail;
1429
0
  }
1430
1431
0
  worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable, 0);
1432
0
  if (!worker->runnable) {
1433
0
    fr_strerror_const("Failed creating runnable heap");
1434
0
    goto fail;
1435
0
  }
1436
1437
0
  worker->timeout = fr_timer_list_ordered_alloc(worker, el->tl);
1438
0
  if (!worker->timeout) {
1439
0
    fr_strerror_const("Failed creating timeouts list");
1440
0
    goto fail;
1441
0
  }
1442
1443
0
  worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1444
0
  if (!worker->dedup) {
1445
0
    fr_strerror_const("Failed creating de_dup tree");
1446
0
    goto fail;
1447
0
  }
1448
1449
0
  worker->listeners = fr_rb_inline_talloc_alloc(worker, fr_worker_listen_t, node, worker_listener_cmp, NULL);
1450
0
  if (!worker->listeners) {
1451
0
    fr_strerror_const("Failed creating listener tree");
1452
0
    goto fail;
1453
0
  }
1454
1455
0
  worker->intp = unlang_interpret_init(worker, el,
1456
0
               &(unlang_request_func_t){
1457
0
              .init_internal = _worker_request_internal_init,
1458
1459
0
              .done_external = _worker_request_done_external,
1460
0
              .done_internal = _worker_request_done_internal,
1461
0
              .done_detached = _worker_request_done_detached,
1462
1463
0
              .detach = _worker_request_detach,
1464
0
              .yield = _worker_request_yield,
1465
0
              .resume = _worker_request_resume,
1466
0
              .mark_runnable = _worker_request_runnable,
1467
1468
0
              .scheduled = _worker_request_scheduled,
1469
0
              .prioritise = _worker_request_prioritise
1470
0
               },
1471
0
               worker);
1472
0
  if (!worker->intp){
1473
0
    fr_strerror_const("Failed initialising interpreter");
1474
0
    goto fail;
1475
0
  }
1476
1477
0
  {
1478
0
    if (!worker->config.reuse.child_pool_size) worker->config.reuse.child_pool_size = REQUEST_POOL_SIZE;
1479
0
    if (!worker->config.reuse.num_children) worker->config.reuse.num_children = REQUEST_POOL_NUM_OBJECTS;
1480
1481
0
    if (!(worker->slab = request_slab_list_alloc(worker, el, &worker->config.reuse, NULL, NULL,
1482
0
                   UNCONST(void *, worker), true, false))) {
1483
0
      fr_strerror_const("Failed creating request slab list");
1484
0
      goto fail;
1485
0
    }
1486
0
  }
1487
1488
0
  unlang_interpret_set_thread_default(worker->intp);
1489
1490
0
  return worker;
1491
0
}
1492
1493
1494
/** The main loop and entry point of the stand-alone worker thread.
1495
 *
1496
 *  Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1497
 *  instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1498
 *  request.
1499
 *
1500
 * @param[in] worker the worker data structure to manage
1501
 */
1502
void fr_worker(fr_worker_t *worker)
1503
0
{
1504
0
  WORKER_VERIFY;
1505
1506
0
  while (true) {
1507
0
    bool wait_for_event;
1508
0
    int num_events;
1509
1510
0
    WORKER_VERIFY;
1511
1512
    /*
1513
     *  There are runnable requests.  We still service
1514
     *  the event loop, but we don't wait for events.
1515
     */
1516
0
    wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1517
0
    if (wait_for_event) {
1518
0
      if (worker->exiting && (worker_num_requests(worker) == 0)) break;
1519
1520
0
      DEBUG4("Ready to process requests");
1521
0
    }
1522
1523
    /*
1524
     *  Check the event list.  If there's an error
1525
     *  (e.g. exit), we stop looping and clean up.
1526
     */
1527
0
    DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1528
0
    num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1529
0
    if (num_events < 0) {
1530
0
      if (fr_event_loop_exiting(worker->el)) {
1531
0
        DEBUG4("Event loop exiting");
1532
0
        break;
1533
0
      }
1534
1535
0
      PERROR("Failed retrieving events");
1536
0
      break;
1537
0
    }
1538
1539
0
    DEBUG4("%u event(s) pending", num_events);
1540
1541
    /*
1542
     *  Service outstanding events.
1543
     */
1544
0
    if (num_events > 0) {
1545
0
      DEBUG4("Servicing event(s)");
1546
0
      fr_event_service(worker->el);
1547
0
    }
1548
1549
    /*
1550
     *  Run any outstanding requests.
1551
     */
1552
0
    worker_run_request(worker, fr_time());
1553
0
  }
1554
0
}
1555
1556
/** Pre-event handler
1557
 *
1558
 *  This should be run ONLY in single-threaded mode!
1559
 */
1560
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
1561
0
{
1562
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1563
0
  request_t *request;
1564
1565
0
  request = fr_heap_peek(worker->runnable);
1566
0
  if (!request) return 0;
1567
1568
  /*
1569
   *  There's work to do.  Tell the event handler to poll
1570
   *  for IO / timers, but also immediately return to the
1571
   *  calling function, which has more work to do.
1572
   */
1573
0
  return 1;
1574
0
}
1575
1576
1577
/** Post-event handler
1578
 *
1579
 *  This should be run ONLY in single-threaded mode!
1580
 */
1581
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
1582
0
{
1583
0
  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1584
1585
0
  worker_run_request(worker, fr_time());  /* Event loop time can be too old, and trigger asserts */
1586
0
}
1587
1588
/** Print debug information about the worker structure
1589
 *
1590
 * @param[in] worker the worker
1591
 * @param[in] fp the file where the debug output is printed.
1592
 */
1593
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1594
0
{
1595
0
  WORKER_VERIFY;
1596
1597
0
  fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1598
0
  fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1599
1600
0
  fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1601
0
    fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1602
0
  if (worker->stats.in) {
1603
0
    fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1604
0
      fr_time_delta_unwrap(worker->tracking.running_total) / worker->stats.in);
1605
0
  }
1606
1607
0
  fr_time_tracking_debug(&worker->tracking, fp);
1608
1609
0
}
1610
1611
/** Create a channel to the worker
1612
 *
1613
 * Called by the master (i.e. network) thread when it needs to create
1614
 * a new channel to a particuler worker.
1615
 *
1616
 * @param[in] worker the worker
1617
 * @param[in] master the control plane of the master
1618
 * @param[in] ctx the context in which the channel will be created
1619
 */
1620
fr_channel_t *fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
1621
0
{
1622
0
  fr_channel_t *ch;
1623
0
  pthread_t id;
1624
0
  bool same;
1625
1626
0
  WORKER_VERIFY;
1627
1628
0
  id = pthread_self();
1629
0
  same = (pthread_equal(id, worker->thread_id) != 0);
1630
1631
0
  ch = fr_channel_create(ctx, master, worker->control, same);
1632
0
  if (!ch) return NULL;
1633
1634
0
  fr_channel_set_recv_request(ch, worker, worker_recv_request);
1635
1636
  /*
1637
   *  Tell the worker about the channel
1638
   */
1639
0
  if (fr_channel_signal_open(ch) < 0) {
1640
0
    talloc_free(ch);
1641
0
    return NULL;
1642
0
  }
1643
1644
0
  return ch;
1645
0
}
1646
1647
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
1648
0
{
1649
0
  fr_ring_buffer_t *rb;
1650
1651
  /*
1652
   *  Skip a bunch of work if we're already in the worker thread.
1653
   */
1654
0
  if (is_worker_thread(worker)) {
1655
0
    return fr_worker_listen_cancel_self(worker, li);
1656
0
  }
1657
1658
0
  rb = fr_worker_rb_init();
1659
0
  if (!rb) return -1;
1660
1661
0
  return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN_DEAD, &li, sizeof(li));
1662
0
}
1663
1664
#ifdef WITH_VERIFY_PTR
1665
/** Verify the worker data structures.
1666
 *
1667
 * @param[in] worker the worker
1668
 */
1669
static void worker_verify(fr_worker_t *worker)
1670
0
{
1671
0
  int i;
1672
1673
0
  (void) talloc_get_type_abort(worker, fr_worker_t);
1674
0
  fr_atomic_queue_verify(worker->aq_control);
1675
1676
0
  fr_assert(worker->control != NULL);
1677
0
  (void) talloc_get_type_abort(worker->control, fr_control_t);
1678
1679
0
  fr_assert(worker->el != NULL);
1680
0
  (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1681
1682
0
  fr_assert(worker->runnable != NULL);
1683
0
  (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1684
1685
0
  fr_assert(worker->dedup != NULL);
1686
0
  (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1687
1688
0
  for (i = 0; i < worker->config.max_channels; i++) {
1689
0
    if (!worker->channel[i].ch) continue;
1690
1691
0
    (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1692
0
  }
1693
0
}
1694
#endif
1695
1696
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1697
0
{
1698
0
  if (num < 0) return -1;
1699
0
  if (num == 0) return 0;
1700
1701
0
  stats[0] = worker->stats.in;
1702
0
  if (num >= 2) stats[1] = worker->stats.out;
1703
0
  if (num >= 3) stats[2] = worker->stats.dup;
1704
0
  if (num >= 4) stats[3] = worker->stats.dropped;
1705
0
  if (num >= 5) stats[4] = worker->num_naks;
1706
0
  if (num >= 6) stats[5] = worker->num_active;
1707
1708
0
  if (num <= 6) return num;
1709
1710
0
  return 6;
1711
0
}
1712
1713
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1714
0
{
1715
0
  fr_worker_t const *worker = ctx;
1716
0
  fr_time_delta_t when;
1717
1718
0
  if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1719
0
    fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1720
0
    fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1721
0
    fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1722
0
    fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1723
0
    fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1724
0
    fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1725
0
    fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1726
0
  }
1727
1728
0
  if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1729
0
    when = worker->predicted;
1730
0
    fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1731
1732
0
    when = worker->tracking.running_total;
1733
0
    if (fr_time_delta_ispos(when) && (worker->stats.in > worker->stats.dropped)) {
1734
0
      when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1735
0
    }
1736
0
    fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1737
1738
0
    when = worker->tracking.running_total;
1739
0
    fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1740
1741
0
    when = worker->tracking.waiting_total;
1742
0
    fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1743
1744
0
    fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1745
0
    fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1746
0
  }
1747
1748
0
  return 0;
1749
0
}
1750
1751
fr_cmd_table_t cmd_worker_table[] = {
1752
  {
1753
    .parent = "stats",
1754
    .name = "worker",
1755
    .help = "Statistics for workers threads.",
1756
    .read_only = true
1757
  },
1758
1759
  {
1760
    .parent = "stats worker",
1761
    .add_name = true,
1762
    .name = "self",
1763
    .syntax = "[(count|cpu)]",
1764
    .func = cmd_stats_worker,
1765
    .help = "Show statistics for a specific worker thread.",
1766
    .read_only = true
1767
  },
1768
1769
  CMD_TABLE_END
1770
};