Coverage Report

Created: 2026-06-30 07:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/freeradius-server/src/lib/io/coord_pair.c
Line
Count
Source
1
/*
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the Free Software
14
 *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15
 */
16
17
/**
18
 * $Id: ec9a7b3c72087b3bf84af340fac59b9f8905dc60 $
19
 *
20
 * @brief Sending pair lists to and from coordination threads
21
 * @file io/coord_pair.c
22
 *
23
 * @copyright 2026 Network RADIUS SAS (legal@networkradius.com)
24
 */
25
RCSID("$Id: ec9a7b3c72087b3bf84af340fac59b9f8905dc60 $")
26
27
#include <freeradius-devel/internal/internal.h>
28
#include <freeradius-devel/io/listen.h>
29
#include <freeradius-devel/io/coord_pair.h>
30
#include <freeradius-devel/io/coord_priv.h>
31
#include <freeradius-devel/server/main_config.h>
32
#include <freeradius-devel/unlang/base.h>
33
34
static _Atomic(uint64_t) request_number = 0;
35
36
FR_SLAB_TYPES(request, request_t)
37
0
FR_SLAB_FUNCS(request, request_t)
Unexecuted instantiation: coord_pair.c:request_slab_reserve
Unexecuted instantiation: coord_pair.c:request_slab_element_init
Unexecuted instantiation: coord_pair.c:request_slab_release
Unexecuted instantiation: coord_pair.c:request_slab_list_alloc
Unexecuted instantiation: coord_pair.c:request_slab_init
Unexecuted instantiation: coord_pair.c:_request_slab_cleanup
38
39
static fr_dlist_head_t  *coord_pair_regs = NULL;
40
static module_list_t  *coord_pair_modules;
41
static fr_dict_attr_t const *attr_worker_id = NULL;
42
43
/** Registration of pair list callbacks
44
 *
45
 */
46
struct fr_coord_pair_reg_s {
47
  char const      *name;      //!< Name for log / request name.
48
  fr_dlist_t      entry;      //!< Entry in list of pair list registrations
49
  fr_dict_attr_t const    *attr_packet_type;  //!< Attribute containing packet type
50
  fr_dict_attr_t const    *root;      //!< Pair list decoding root attribute
51
  fr_coord_worker_pair_cb_reg_t **callbacks;    //!< Array of pointers to callbacks
52
  uint32_t      max_packet_type;  //!< Largest valid value for packet type
53
  uint32_t      cb_id;      //!< The coordinator callback ID used for pair list handling
54
  fr_time_delta_t     max_request_time; //!< Maximum time for coordinator request processing.
55
  fr_slab_config_t    reuse;      //!< Request slab allocation config.
56
  virtual_server_t const    *vs;      //!< Virtual server containing coordinator process sections.
57
};
58
59
struct fr_coord_pair_s {
60
  fr_coord_t      *coord;     //!< Coordinator which this coord pair is attached to.
61
  fr_coord_pair_reg_t   *coord_pair_reg;  //!< Registration details for this coord pair
62
  fr_event_list_t     *el;      //!< Event list for interpreter.
63
  unlang_interpret_t    *intp;      //!< Interpreter for running requests.
64
  fr_heap_t     *runnable;    //!< Current runnable requests.
65
66
  fr_timer_list_t     *timeout;   //!< Track when requests timeout using a dlist.
67
  fr_time_delta_t     predicted;    //!< How long we predict a request will take to execute.
68
  fr_time_tracking_t    tracking;   //!< How much time the coordinator has spent doing things.
69
  uint64_t      num_active;   //!< Number of active requests.
70
  request_slab_list_t   *slab;      //!< slab allocator for request_t
71
};
72
73
/** Packet context used when coordinator messages are processed through an interpreter
74
 *
75
 * Allows access to the coordinator structure and arbitrary data
76
 * throughout the state machine.
77
 */
78
typedef struct {
79
  fr_coord_pair_t     *coord_pair;    //!< Coordinator pair this packet is for.
80
  void        *uctx;      //!< Source specific ctx.
81
} fr_coord_packet_ctx_t;
82
83
/** Conf parser to read slab settings from module config
84
 */
85
static const conf_parser_t request_reuse_config[] = {
86
  FR_SLAB_CONFIG_CONF_PARSER
87
  CONF_PARSER_TERMINATOR
88
};
89
90
/** Remove a coord pair registration from the list when it is freed
91
 */
92
static int _coord_pair_reg_free(fr_coord_pair_reg_t *to_free)
93
0
{
94
0
  fr_assert(coord_pair_regs);
95
96
0
  fr_dlist_remove(coord_pair_regs, to_free);
97
98
  /* If all the registrations are gone, free the list */
99
0
  if (fr_dlist_num_elements(coord_pair_regs) == 0) {
100
0
    TALLOC_FREE(coord_pair_regs);
101
0
    TALLOC_FREE(coord_pair_modules);
102
0
  }
103
0
  return 0;
104
0
}
105
106
/** Register a set of callbacks for pair list based coordinator messages
107
 *
108
 * Returns a structure to pass as uctx to fr_coord_cb_t using the
109
 * macro FR_COORD_PAIR_CB_CTX_SET.
110
 *
111
 * @param reg_ctx Callback details to register.
112
 */
113
fr_coord_pair_reg_t *fr_coord_pair_register(fr_coord_pair_reg_ctx_t *reg_ctx)
114
0
{
115
0
  fr_coord_pair_reg_t   *coord_pair_reg;
116
0
  fr_coord_worker_pair_cb_reg_t *cb_reg = reg_ctx->worker_cb;
117
0
  CONF_SECTION      *cs;
118
0
  CONF_PAIR     *cp;
119
120
0
  fr_assert(reg_ctx->root);
121
122
  /* Resolve the Worker-Id attribute if not already done */
123
0
  if (!attr_worker_id) {
124
0
    attr_worker_id = fr_dict_attr_by_name(NULL, fr_dict_root(fr_dict_internal()), "Worker-Id");
125
0
    if (!attr_worker_id) {
126
0
      ERROR("Failed to resolve Worker-Id attribute");
127
0
      return NULL;
128
0
    }
129
0
  }
130
131
0
  if (!coord_pair_regs) {
132
0
    MEM(coord_pair_regs = talloc_zero(NULL, fr_dlist_head_t));
133
0
    fr_dlist_init(coord_pair_regs, fr_coord_pair_reg_t, entry);
134
0
    MEM(coord_pair_modules = module_list_alloc(NULL, &module_list_type_global, "coord", true));
135
0
  }
136
137
0
  MEM(coord_pair_reg = talloc(coord_pair_regs, fr_coord_pair_reg_t));
138
0
  *coord_pair_reg = (fr_coord_pair_reg_t) {
139
0
    .name = reg_ctx->name,
140
0
    .root = reg_ctx->root,
141
0
    .cb_id = reg_ctx->cb_id,
142
0
    .max_request_time = fr_time_delta_eq(reg_ctx->max_request_time, fr_time_delta_from_msec(0)) ?
143
0
      main_config->worker.max_request_time : reg_ctx->max_request_time,
144
0
  };
145
146
0
  while (cb_reg->callback) {
147
0
    if (cb_reg->packet_type > coord_pair_reg->max_packet_type) {
148
0
      coord_pair_reg->max_packet_type = cb_reg->packet_type;
149
0
    }
150
0
    cb_reg++;
151
0
  }
152
153
  /*
154
   *  A sane limit on packet type values to avoid a huge array.
155
   *  If larger values are needed in the future we can use a folded array.
156
   */
157
0
  fr_assert(coord_pair_reg->max_packet_type <= 256);
158
159
0
  MEM(coord_pair_reg->callbacks = talloc_zero_array(coord_pair_reg, fr_coord_worker_pair_cb_reg_t *,
160
0
                coord_pair_reg->max_packet_type + 1));
161
162
0
  cb_reg = reg_ctx->worker_cb;
163
0
  while (cb_reg->callback) {
164
0
    coord_pair_reg->callbacks[cb_reg->packet_type] = cb_reg;
165
0
    cb_reg++;
166
0
  }
167
168
0
  cs = cf_section_find(reg_ctx->cs, "reuse", NULL);
169
170
  /*
171
   *  Create an empty "reuse" section if one is not found, so defaults are applied
172
   */
173
0
  if (!cs) {
174
0
    cs = cf_section_alloc(reg_ctx->cs, reg_ctx->cs, "reuse", NULL);
175
0
  }
176
177
0
  if (cf_section_rules_push(cs, request_reuse_config) < 0) {
178
0
  fail:
179
0
    talloc_free(coord_pair_reg);
180
0
    return NULL;
181
0
  }
182
0
  if (cf_section_parse(coord_pair_reg, &coord_pair_reg->reuse, cs) < 0) goto fail;
183
184
  /*
185
   *  Set defaults for request slab allocation, if not set by conf parsing
186
   */
187
0
  if (!coord_pair_reg->reuse.child_pool_size) coord_pair_reg->reuse.child_pool_size = REQUEST_POOL_SIZE;
188
0
  if (!coord_pair_reg->reuse.num_children) coord_pair_reg->reuse.num_children = REQUEST_POOL_NUM_OBJECTS;
189
190
0
  cp = cf_pair_find(reg_ctx->cs, "virtual_server");
191
0
  if (!cp) {
192
0
    cf_log_err(reg_ctx->cs, "Missing virtual_server option");
193
0
    goto fail;
194
0
  }
195
196
0
  coord_pair_reg->vs = virtual_server_find(cf_pair_value(cp));
197
0
  if (!coord_pair_reg->vs) {
198
0
    cf_log_err(cp, "Virtual server not found");
199
0
    goto fail;
200
0
  }
201
202
  /*
203
   *  Validate that the virtual server uses the correct namespace.
204
   */
205
0
  if (reg_ctx->root->dict != virtual_server_dict_by_cs(virtual_server_cs(coord_pair_reg->vs))) {
206
0
    cf_log_err(cp, "Virtual server has namespace %s, should be %s",
207
0
         fr_dict_root(virtual_server_dict_by_cs(virtual_server_cs(coord_pair_reg->vs)))->name,
208
0
         fr_dict_root(coord_pair_reg->root->dict)->name);
209
0
    goto fail;
210
0
  }
211
0
  coord_pair_reg->attr_packet_type = virtual_server_packet_type_by_cs(virtual_server_cs(coord_pair_reg->vs));
212
213
0
  fr_dlist_insert_tail(coord_pair_regs, coord_pair_reg);
214
0
  talloc_set_destructor(coord_pair_reg, _coord_pair_reg_free);
215
216
0
  return coord_pair_reg;
217
0
}
218
219
/** Return the coordinator callback ID associated with a coord_pair_reg_t
220
 */
221
uint32_t fr_coord_pair_reg_cb_id(fr_coord_pair_reg_t *coord_pair_reg)
222
0
{
223
0
  fr_assert(coord_pair_reg);
224
0
  return coord_pair_reg->cb_id;
225
0
}
226
227
/*
228
 *  The following set of callbacks for request handling are mirrors of
229
 *  their equivalent in worker.c
230
 */
231
232
/** Signal the unlang interpreter that it needs to stop running the request
233
 *
234
 * @param[in] request request to cancel.  The request may still run to completion.
235
 */
236
static void coord_pair_stop_request(request_t *request)
237
0
{
238
0
  unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
239
0
}
240
241
/** Enforce max_request_time
242
 *
243
 * @param[in] tl  the coordinators's timer list.
244
 * @param[in] when  the current time
245
 * @param[in] uctx  the request_t timing out.
246
 */
247
static void _coord_pair_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
248
0
{
249
0
  request_t *request = talloc_get_type_abort(uctx, request_t);
250
251
0
  REDEBUG("Request has reached max_request_time - signalling it to stop");
252
0
  coord_pair_stop_request(request);
253
254
0
  request->rcode = RLM_MODULE_TIMEOUT;
255
0
}
256
257
/** Set, or re-set the request timer
258
 *
259
 * @param[in] coord_pair  the coord_pair_t containing the timeout lists.
260
 * @param[in] request   that we're timing out.
261
 * @param[in] timeout   the timeout to set.
262
 * @return
263
 *  - 0 on success.
264
 *  - -1 on failure.
265
 */
266
static int fr_coord_pair_request_timeout_set(fr_coord_pair_t *coord_pair, request_t *request, fr_time_delta_t timeout)
267
0
{
268
0
  if (unlikely(fr_timer_in(request, coord_pair->timeout, &request->timeout, timeout,
269
0
         true, _coord_pair_request_timeout, request) < 0)) {
270
0
    RERROR("Failed to create request timeout timer");
271
0
    return -1;
272
0
  }
273
274
0
  return 0;
275
0
}
276
277
/** Start time tracking for a request, and mark it as runnable.
278
 */
279
static int coord_pair_request_time_tracking_start(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
280
0
{
281
0
  fr_assert(!fr_timer_armed(request->timeout));
282
283
0
  if (unlikely(fr_coord_pair_request_timeout_set(coord_pair, request,
284
0
                   coord_pair->coord_pair_reg->max_request_time) < 0)) {
285
0
    RERROR("Failed to set request timeout");
286
0
    return -1;
287
0
  }
288
289
0
  RDEBUG3("Time tracking started in yielded state");
290
0
  fr_time_tracking_start(&coord_pair->tracking, &request->async->tracking, now);
291
0
  fr_time_tracking_yield(&request->async->tracking, now);
292
0
  coord_pair->num_active++;
293
294
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
295
0
  (void) fr_heap_insert(&coord_pair->runnable, request);
296
297
0
  return 0;
298
0
}
299
300
/** End time tracking for a request
301
 */
302
static void coord_pair_request_time_tracking_end(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
303
0
{
304
0
  RDEBUG3("Time tracking ended");
305
0
  fr_time_tracking_end(&coord_pair->predicted, &request->async->tracking, now);
306
0
  fr_assert(coord_pair->num_active > 0);
307
0
  coord_pair->num_active--;
308
309
0
  TALLOC_FREE(request->timeout);  /* Disarm the request timer */
310
0
}
311
312
313
static inline CC_HINT(always_inline)
314
void coord_pair_request_init(fr_event_list_t *el, request_t *request, fr_time_t now, void *packet_ctx)
315
{
316
  if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
317
  if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
318
319
  request->packet->timestamp = now;
320
  request->async = talloc_zero(request, fr_async_t);
321
  request->async->recv_time = now;
322
  request->async->el = el;
323
  request->async->packet_ctx = packet_ctx;
324
  fr_dlist_entry_init(&request->async->entry);
325
}
326
327
static inline CC_HINT(always_inline)
328
void coord_pair_request_name_number(request_t *request, char const *name)
329
0
{
330
0
  request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst);
331
0
  if (request->name) talloc_const_free(request->name);
332
0
  request->name = talloc_asprintf(request, "Coord-%s-%"PRIu64, name, request->number);
333
0
}
334
335
static int _coord_pair_request_deinit( request_t *request, UNUSED void *uctx)
336
0
{
337
0
  return request_slab_deinit(request);
338
0
}
339
340
static request_t *coord_pair_request_bootstrap(fr_coord_pair_t *coord_pair, fr_time_t now, void *uctx)
341
{
342
  request_t   *request;
343
  fr_coord_packet_ctx_t *packet_ctx;
344
345
  request = request_slab_reserve(coord_pair->slab);
346
  if (!request) {
347
    ERROR("Coordinator failed allocating new request");
348
    return NULL;
349
  }
350
351
  request_slab_element_set_destructor(request, _coord_pair_request_deinit, coord_pair);
352
353
  if (request_init(request, REQUEST_TYPE_INTERNAL,
354
       (&(request_init_args_t){
355
        .namespace = virtual_server_dict_by_cs(virtual_server_cs(coord_pair->coord_pair_reg->vs))
356
       }))) {
357
    ERROR("Coordinator failed initializing new request");
358
    request_slab_release(request);
359
    return NULL;
360
  }
361
362
  MEM(packet_ctx = talloc(request, fr_coord_packet_ctx_t));
363
  *packet_ctx = (fr_coord_packet_ctx_t) {
364
    .coord_pair = coord_pair,
365
    .uctx = uctx
366
  };
367
  coord_pair_request_init(coord_pair->el, request, now, packet_ctx);
368
  coord_pair_request_name_number(request, coord_pair->coord_pair_reg->name);
369
370
  unlang_interpret_set(request, coord_pair->intp);
371
372
  return request;
373
}
374
375
static void coord_pair_request_start(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
376
0
{
377
0
  fr_pair_t *vp;
378
379
0
  vp = fr_pair_find_by_da(&request->request_pairs, NULL, coord_pair->coord_pair_reg->attr_packet_type);
380
0
  if (!vp) {
381
0
    RERROR("Missing %s attribute", coord_pair->coord_pair_reg->attr_packet_type->name);
382
0
  error:
383
0
    request_slab_release(request);
384
0
    return;
385
0
  }
386
387
0
  request->packet->code = vp->vp_uint32;
388
389
0
  if (virtual_server_push(NULL, request, coord_pair->coord_pair_reg->vs, UNLANG_TOP_FRAME) < 0) {
390
0
    RERROR("Protocol failed to set 'process' function");
391
0
    goto error;
392
0
  }
393
394
0
  if (unlikely(coord_pair_request_time_tracking_start(coord_pair, request, now) < 0)) {
395
0
    RERROR("Failed to start request time tracking");
396
0
    goto error;
397
0
  }
398
0
}
399
400
static void _coord_pair_request_internal_init(request_t *request, void *uctx)
401
0
{
402
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
403
0
  fr_time_t now = fr_time();
404
405
0
  fr_assert(request->packet);
406
0
  fr_assert(request->reply);
407
408
0
  request->packet->timestamp = now;
409
0
  request->async = talloc_zero(request, fr_async_t);
410
0
  request->async->recv_time = now;
411
0
  request->async->el = coord_pair->el;
412
0
  fr_dlist_entry_init(&request->async->entry);
413
414
  /*
415
   *  Requests generated by the interpreter
416
   *  are always marked up as internal.
417
   */
418
0
  fr_assert(request_is_internal(request));
419
0
  coord_pair_request_time_tracking_start(coord_pair, request, now);
420
0
}
421
422
/** External request is now complete - will never happen with coordinators
423
 *
424
 */
425
static void _coord_pair_request_done_external(UNUSED request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
426
0
{
427
0
  fr_assert(0);
428
0
}
429
430
/** Internal request (i.e. one generated by the interpreter) is now complete
431
 *
432
 * Whatever generated the request is now responsible for freeing it.
433
 */
434
static void _coord_pair_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
435
0
{
436
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
437
438
0
  coord_pair_request_time_tracking_end(coord_pair, request, fr_time());
439
440
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
441
0
  fr_assert(!fr_timer_armed(request->timeout));
442
0
  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
443
0
}
444
445
/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
446
 *
447
 * As the request has no parent, then there's nothing to free it
448
 * so we have to.
449
 */
450
static void _coord_pair_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
451
0
{
452
0
  fr_assert(!fr_heap_entry_inserted(request->runnable));
453
454
0
  TALLOC_FREE(request->timeout);
455
456
0
  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
457
458
0
  talloc_free(request);
459
0
}
460
461
/** Make us responsible for running the request
462
 *
463
 */
464
static void _coord_pair_request_detach(request_t *request, void *uctx)
465
0
{
466
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
467
468
0
  RDEBUG4("%s - Request detaching", __FUNCTION__);
469
470
0
  if (request_is_detachable(request)) {
471
    /*
472
    * End the time tracking...  We don't track detached requests,
473
    * because they don't contribute for the time consumed by an
474
    * external request.
475
    */
476
0
    if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
477
0
      RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
478
0
      fr_time_tracking_resume(&request->async->tracking, fr_time());
479
0
    }
480
0
    coord_pair_request_time_tracking_end(coord_pair, request, fr_time());
481
482
0
    if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
483
484
0
    RDEBUG3("Request is detached");
485
0
  } else {
486
0
    fr_assert_msg(0, "Request is not detachable");
487
0
  }
488
0
}
489
490
/** Request is now runnable
491
 *
492
 */
493
static void _coord_pair_request_runnable(request_t *request, void *uctx)
494
0
{
495
0
  fr_coord_pair_t *coord_pair = uctx;
496
497
0
  RDEBUG4("%s - Request marked as runnable", __FUNCTION__);
498
0
  fr_heap_insert(&coord_pair->runnable, request);
499
0
}
500
501
/** Interpreter yielded request
502
 *
503
 */
504
static void _coord_pair_request_yield(request_t *request, UNUSED void *uctx)
505
0
{
506
0
  RDEBUG4("%s - Request yielded", __FUNCTION__);
507
0
  if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
508
0
}
509
510
/** Interpreter is starting to work on request again
511
 *
512
 */
513
static void _coord_pair_request_resume(request_t *request, UNUSED void *uctx)
514
0
{
515
0
  RDEBUG4("%s - Request resuming", __FUNCTION__);
516
0
  if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
517
0
}
518
519
/** Check if a request is scheduled
520
 *
521
 */
522
static bool _coord_pair_request_scheduled(request_t const *request, UNUSED void *uctx)
523
0
{
524
0
  return fr_heap_entry_inserted(request->runnable);
525
0
}
526
527
/** Update a request's priority
528
 *
529
 */
530
static void _coord_pair_request_prioritise(request_t *request, void *uctx)
531
0
{
532
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
533
534
0
  RDEBUG4("%s - Request priority changed", __FUNCTION__);
535
536
  /* Extract the request from the runnable queue _if_ it's in the runnable queue */
537
0
  if (fr_heap_extract(&coord_pair->runnable, request) < 0) return;
538
539
  /* Reinsert it to re-evaluate its new priority */
540
0
  fr_heap_insert(&coord_pair->runnable, request);
541
0
}
542
543
/** Compare two requests by priority and sequence
544
 */
545
static int8_t coord_pair_runnable_cmp(void const *one, void const *two)
546
0
{
547
0
  request_t const *a = one, *b = two;
548
0
  int ret;
549
550
0
  ret = CMP(b->priority, a->priority);
551
0
  if (ret != 0) return ret;
552
553
0
  return CMP(a->sequence, b->sequence);
554
0
}
555
556
0
void fr_coord_pair_inst_destroy(UNUSED fr_coord_t *coord, fr_coord_cb_inst_t *inst, bool single_thread, UNUSED void *uctx) {
557
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(inst->inst_data, fr_coord_pair_t);
558
0
  int   ret, count = 0;
559
560
0
  if (!single_thread) unlang_interpret_set_thread_default(NULL);
561
562
0
  ret = fr_timer_list_force_run(coord_pair->timeout);
563
0
  if (unlikely(ret < 0)) {
564
0
    fr_assert_msg(0, "Failed to force run the timeout list");
565
0
  } else {
566
0
    count += ret;
567
0
  }
568
569
0
  DEBUG("Coordinator is exiting - stopped %u requests", count);
570
0
}
571
572
/** Create the coord_pair coord instance data
573
 */
574
static fr_coord_pair_t *fr_coord_pair_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el,
575
               bool single_thread, void *uctx)
576
0
{
577
0
  fr_coord_pair_t   *coord_pair;
578
0
  fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t);
579
580
0
  MEM(coord_pair = talloc(ctx, fr_coord_pair_t));
581
0
  *coord_pair = (fr_coord_pair_t) {
582
0
    .coord = coord,
583
0
    .coord_pair_reg = coord_pair_reg,
584
0
    .el = el
585
0
  };
586
587
0
  coord_pair->runnable = fr_heap_talloc_alloc(coord_pair, coord_pair_runnable_cmp, request_t, runnable, 0);
588
0
  if (!coord_pair->runnable) {
589
0
    fr_strerror_const("Failed creating runnable heap");
590
0
  fail:
591
0
    talloc_free(coord_pair);
592
0
    return NULL;
593
0
  }
594
595
0
  coord_pair->timeout = fr_timer_list_ordered_alloc(coord_pair, el->tl);
596
0
  if (!coord_pair->timeout) {
597
0
    fr_strerror_const("Failed creating timeouts list");
598
0
    goto fail;
599
0
  }
600
601
0
  coord_pair->intp = unlang_interpret_init(coord_pair, el,
602
0
          &(unlang_request_func_t){
603
0
            .init_internal = _coord_pair_request_internal_init,
604
605
0
            .done_external = _coord_pair_request_done_external,
606
0
            .done_internal = _coord_pair_request_done_internal,
607
0
            .done_detached = _coord_pair_request_done_detached,
608
609
0
            .detach = _coord_pair_request_detach,
610
0
            .yield = _coord_pair_request_yield,
611
0
            .resume = _coord_pair_request_resume,
612
0
            .mark_runnable = _coord_pair_request_runnable,
613
614
0
            .scheduled = _coord_pair_request_scheduled,
615
0
            .prioritise = _coord_pair_request_prioritise
616
0
          }, coord_pair);
617
618
0
  if (!coord_pair->intp) goto fail;
619
620
0
  if (!(coord_pair->slab = request_slab_list_alloc(coord_pair, el, &coord_pair_reg->reuse, NULL, NULL,
621
0
               coord_pair, true, false))) {
622
0
    goto fail;
623
0
  }
624
625
0
  if (!single_thread) unlang_interpret_set_thread_default(coord_pair->intp);
626
627
0
  return coord_pair;
628
0
}
629
630
static inline CC_HINT(always_inline) void coord_run_request(fr_coord_pair_t *coord_pair, fr_time_t start)
631
0
{
632
0
  request_t *request;
633
0
  fr_time_t now;
634
635
0
  now = start;
636
637
0
  while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
638
0
  ((request = fr_heap_pop(&coord_pair->runnable)) != NULL)) {
639
0
    REQUEST_VERIFY(request);
640
0
    fr_assert(!fr_heap_entry_inserted(request->runnable));
641
642
0
    (void)unlang_interpret(request, UNLANG_REQUEST_RESUME);
643
644
0
    now = fr_time();
645
0
  }
646
0
}
647
648
/*
649
 *  Pre and post events used in single threaded mode
650
 */
651
652
static int fr_coord_pair_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
653
0
{
654
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
655
0
  request_t *request;
656
657
0
  request = fr_heap_peek(coord_pair->runnable);
658
0
  return request ? 1 : 0;
659
0
}
660
661
static void fr_coord_pair_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
662
0
{
663
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
664
665
0
  coord_run_request(coord_pair, fr_time());
666
0
}
667
668
/** Event callback in multi threaded mode
669
 */
670
static void fr_coord_pair_event(UNUSED fr_event_list_t *el, void *uctx)
671
0
{
672
0
  fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
673
674
0
  coord_run_request(coord_pair, fr_time());
675
0
}
676
677
/** Callback run when a coordinator receives pair list data
678
 *
679
 * Converts the data into a request.
680
 */
681
void fr_coord_pair_data_recv(UNUSED fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *inst, void *uctx)
682
0
{
683
0
  fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t);
684
0
  fr_coord_pair_t   *coord_pair = talloc_get_type_abort(inst, fr_coord_pair_t);
685
0
  request_t   *request;
686
0
  fr_pair_t   *vp;
687
688
0
  request = coord_pair_request_bootstrap(coord_pair, now, coord_pair_reg);
689
0
  if (!request) return;
690
691
0
  if (fr_pair_append_by_da(request->request_ctx, &vp, &request->request_pairs, attr_worker_id) < 0) {
692
0
  error:
693
0
    request_slab_release(request);
694
0
    return;
695
0
  };
696
0
  vp->vp_int32 = worker_id;
697
698
0
  if (fr_internal_decode_list_dbuff(request->pair_list.request, &request->request_pairs,
699
0
            fr_dict_root(request->proto_dict), dbuff, NULL) < 0) {
700
0
    RERROR("Failed decoding packet");
701
0
    goto error;
702
0
  }
703
704
0
  coord_pair_request_start(coord_pair, request, now);
705
0
}
706
707
/** Callback run when a worker receives pair list data
708
 *
709
 * Finds the packet type attribute in the data and calls the callback
710
 * registered against the value of that attribute.
711
 *
712
 * @param cw  Worker which received the message.
713
 * @param dbuff Data received.
714
 * @param now Time the data is received.
715
 * @param mctx  Module context to pass to callback.
716
 * @param uctx  The coord_pair registration.
717
 */
718
void fr_coord_worker_pair_data_recv(fr_coord_worker_t *cw, fr_dbuff_t *dbuff, fr_time_t now, module_ctx_t *mctx, void *uctx)
719
0
{
720
0
  fr_coord_pair_reg_t   *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t);
721
0
  fr_pair_list_t      list;
722
0
  fr_pair_t     *vp;
723
724
0
  fr_pair_list_init(&list);
725
0
  if (fr_internal_decode_list_dbuff(NULL, &list, coord_pair_reg->root, dbuff, NULL) < 0) {
726
0
    PERROR("Failed to decode data as pair list");
727
0
    goto free;
728
0
  }
729
730
0
  vp = fr_pair_find_by_da_nested(&list, NULL, coord_pair_reg->attr_packet_type);
731
732
0
  if (!vp) {
733
0
    ERROR("Message received without %s", coord_pair_reg->attr_packet_type->name);
734
0
    goto free;
735
0
  }
736
737
0
  if (vp->vp_uint32 > coord_pair_reg->max_packet_type || !coord_pair_reg->callbacks[vp->vp_uint32]) {
738
0
    ERROR("Message received with invalid value %pP", vp);
739
0
    goto free;
740
0
  }
741
742
0
  coord_pair_reg->callbacks[vp->vp_uint32]->callback(cw, coord_pair_reg, &list, now, mctx,
743
0
                 coord_pair_reg->callbacks[vp->vp_uint32]->uctx);
744
745
0
free:
746
0
  fr_pair_list_free(&list);
747
0
}
748
749
/** Send a reply list from a coordinator to a worker
750
 *
751
 * @param request containing the reply to send.
752
 * @param worker_id to send the reply to.
753
 * @return
754
 *  - 0 on success
755
 *  - -1 on failure
756
 */
757
int fr_coord_to_worker_reply_send(request_t *request, uint32_t worker_id)
758
0
{
759
0
  fr_dbuff_t    dbuff;
760
0
  fr_dbuff_uctx_talloc_t  tctx;
761
0
  fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t);
762
0
  fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(packet_ctx->uctx, fr_coord_pair_reg_t);
763
0
  int     ret;
764
765
0
  if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1;
766
0
  if (fr_internal_encode_list(&dbuff, &request->reply_pairs, NULL) < 0) {
767
0
    fr_dbuff_free_talloc(&dbuff);
768
0
    return -1;
769
0
  }
770
771
0
  ret = fr_coord_to_worker_send(packet_ctx->coord_pair->coord, worker_id, coord_pair_reg->cb_id, &dbuff);
772
773
0
  fr_dbuff_free_talloc(&dbuff);
774
775
0
  return ret;
776
0
}
777
778
/** Send a reply list from a coordinator to all workers
779
 *
780
 * @param request containing the reply to send.
781
 * @return
782
 *  - 0 on success
783
 *  - < 0 on failure
784
 */
785
int fr_coord_to_worker_reply_broadcast(request_t *request)
786
0
{
787
0
  fr_dbuff_t    dbuff;
788
0
  fr_dbuff_uctx_talloc_t  tctx;
789
0
  fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t);
790
0
  fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(packet_ctx->uctx, fr_coord_pair_reg_t);
791
0
  int     ret;
792
793
0
  if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1;
794
0
  if (fr_internal_encode_list(&dbuff, &request->reply_pairs, NULL) < 0) {
795
0
    fr_dbuff_free_talloc(&dbuff);
796
0
    return -1;
797
0
  }
798
799
0
  ret = fr_coord_to_worker_broadcast(packet_ctx->coord_pair->coord, coord_pair_reg->cb_id, &dbuff);
800
801
0
  fr_dbuff_free_talloc(&dbuff);
802
803
0
  return ret;
804
0
}
805
806
/** Send a pair list from a worker to a coordinator
807
 *
808
 * The pair list must include an attribute indicating the packet type
809
 *
810
 * @param cw  The coord worker sending the data.
811
 * @param coord_pair_reg  The coord_pair registration to use.
812
 * @param list  of pairs to send.
813
 * @return
814
 *  - 0 on success
815
 *  - -1 on failure
816
 */
817
int fr_worker_to_coord_pair_send(fr_coord_worker_t *cw, fr_coord_pair_reg_t *coord_pair_reg, fr_pair_list_t *list)
818
0
{
819
0
  fr_dbuff_t    dbuff;
820
0
  fr_dbuff_uctx_talloc_t  tctx;
821
0
  int     ret;
822
823
0
  if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1;
824
0
  if (fr_internal_encode_list(&dbuff, list, NULL) < 0) {
825
0
    fr_dbuff_free_talloc(&dbuff);
826
0
    return -1;
827
0
  }
828
829
0
  ret = fr_worker_to_coord_send(cw, coord_pair_reg->cb_id, &dbuff);
830
831
0
  fr_dbuff_free_talloc(&dbuff);
832
0
  return ret;
833
0
}
834
835
/** Instance creation called during coordinator creation.
836
 *
837
 * @param ctx   to allocate the instance in.
838
 * @param coord   Coordinator to create an instance of.
839
 * @param el    Event list for instance to use.
840
 * @param single_thread is the server in single thread mode.
841
 * @param uctx    configured for the callback this instance relates to.
842
 * @return
843
 *  - fr_coord_cb_inst_t on success
844
 *  - NULL on failure
845
 */
846
fr_coord_cb_inst_t *fr_coord_pair_inst_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el,
847
                 bool single_thread, void *uctx)
848
0
{
849
0
  fr_coord_cb_inst_t  *cb_inst;
850
0
  fr_coord_pair_t   *coord_pair;
851
852
0
  MEM(cb_inst = talloc(ctx, fr_coord_cb_inst_t));
853
854
0
  *cb_inst = (fr_coord_cb_inst_t) {
855
0
    .event_pre_cb = fr_coord_pair_pre_event,
856
0
    .event_post_cb = fr_coord_pair_post_event,
857
0
    .event_cb = fr_coord_pair_event
858
0
  };
859
860
0
  coord_pair = fr_coord_pair_create(ctx, coord, el, single_thread, uctx);
861
0
  if (!coord_pair) {
862
0
    talloc_free(cb_inst);
863
0
    return NULL;
864
0
  }
865
866
0
  cb_inst->inst_data = coord_pair;
867
868
0
  return cb_inst;
869
0
}
870
871
/** Return the coord_pair associated with a coord_pair internal request
872
 *
873
 * @param request to fetch associated coordinator for.
874
 * @return fr_coord_t
875
 */
876
fr_coord_pair_t *fr_coord_pair_request_coord_pair(request_t *request)
877
0
{
878
0
  fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t);
879
0
  return packet_ctx->coord_pair;
880
0
}
881
882
/** Start a coordinator request to run through a coord_pair process module
883
 *
884
 * @param coord_pair  with the process module to run the request.
885
 * @param list    Pairs to populate the request.
886
 * @param now   Request time.
887
 * @return
888
 *  0 on success
889
 *  -1 on failure
890
 */
891
int fr_coord_pair_coord_request_start(fr_coord_pair_t *coord_pair, fr_pair_list_t *list, fr_time_t now)
892
0
{
893
0
  request_t *request;
894
895
0
  request = coord_pair_request_bootstrap(coord_pair, now, coord_pair->coord_pair_reg);
896
0
  if (!request) return -1;
897
898
0
  fr_pair_list_steal(request->request_ctx, list);
899
0
  fr_pair_list_append(&request->request_pairs, list);
900
901
0
  coord_pair_request_start(coord_pair, request, now);
902
0
  return 0;
903
0
}