Coverage Report

Created: 2026-04-12 06:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/freeradius-server/src/lib/io/control.c
Line
Count
Source
1
/*
2
 *   This program is free software; you can redistribute it and/or modify
3
 *   it under the terms of the GNU General Public License as published by
4
 *   the Free Software Foundation; either version 2 of the License, or
5
 *   (at your option) any later version.
6
 *
7
 *   This program is distributed in the hope that it will be useful,
8
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 *   GNU General Public License for more details.
11
 *
12
 *   You should have received a copy of the GNU General Public License
13
 *   along with this program; if not, write to the Free Software
14
 *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15
 */
16
17
/**
18
 * $Id: c133ceaca5bce535f3de3800c578b6d6b51bba5f $
19
 *
20
 * @brief Control-plane signaling
21
 * @file io/control.c
22
 *
23
 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24
 */
25
RCSID("$Id: c133ceaca5bce535f3de3800c578b6d6b51bba5f $")
26
27
#include <freeradius-devel/io/control.h>
28
#include <freeradius-devel/util/strerror.h>
29
#include <freeradius-devel/util/syserror.h>
30
#include <freeradius-devel/util/misc.h>
31
#include <freeradius-devel/util/rand.h>
32
33
#include <fcntl.h>
34
#include <sys/event.h>
35
#include <poll.h>
36
37
/*
38
 *  Debugging, mainly for channel_test
39
 */
40
#if 0
41
#define MPRINT(...) fprintf(stderr, __VA_ARGS__)
42
#else
43
#define MPRINT(...)
44
#endif
45
46
/**
47
 *  Status of control messages
48
 */
49
typedef enum fr_control_message_status_t {
50
  FR_CONTROL_MESSAGE_FREE = 0,      //!< the message is free
51
  FR_CONTROL_MESSAGE_USED,      //!< the message is used (set only by originator)
52
  FR_CONTROL_MESSAGE_DONE       //!< the message is done (set only by receiver)
53
} fr_control_message_status_t;
54
55
56
/**
57
 *  The header for the control message
58
 */
59
typedef struct {
60
  fr_control_message_status_t status;   //!< status of this message
61
  uint32_t      id;   //!< ID of this message
62
  size_t        data_size;      //!< size of the data we're sending
63
} fr_control_message_t;
64
65
66
typedef struct {
67
  uint32_t      id;   //!< id of this callback
68
  void        *ctx;   //!< context for the callback
69
  fr_control_callback_t   callback; //!< the function to call
70
} fr_control_ctx_t;
71
72
73
/**
74
 *  The control structure.
75
 */
76
struct fr_control_s {
77
  fr_event_list_t   *el;      //!< our event list
78
79
  fr_atomic_queue_t *aq;      //!< destination AQ
80
81
  int     pipe[2];          //!< our pipes
82
83
  bool      same_thread;    //!< are the two ends in the same thread
84
85
  bool      opened;     //!< has the control path been opened.
86
87
  uint32_t    num_callbacks;    //!< the size of the callback array
88
89
  fr_control_ctx_t  type[];     //!< callbacks.  Must be at the end of the structure as these
90
              ///< are created by allocating a larger memory size.
91
};
92
93
static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
94
0
{
95
0
  fr_control_t *c = talloc_get_type_abort(uctx, fr_control_t);
96
0
  fr_time_t now;
97
0
  char read_buffer[256];
98
0
  uint8_t data[256];
99
0
  size_t message_size;
100
0
  uint32_t id = 0;
101
102
  /*
103
   *  The presence of data on the pipe fd is just a trigger to pop all
104
   *  available messages from the atomic queue, so the number of bytes
105
   *  read is not important.
106
   */
107
  /* coverity[check_return] */
108
0
  if (read(fd, read_buffer, sizeof(read_buffer)) <= 0) return;
109
110
0
  now = fr_time();
111
112
0
  while((message_size = fr_control_message_pop(c->aq, &id, data, sizeof(data)))) {
113
0
    if (id >= c->num_callbacks) continue;
114
115
0
    if (!c->type[id].callback) continue;
116
117
0
    c->type[id].callback(c->type[id].ctx, data, message_size, now);
118
0
  }
119
0
}
120
121
/** Free a control structure
122
 *
123
 *  This function really only calls the underlying "garbage collect".
124
 *
125
 * @param[in] c the control structure
126
 */
127
static int _control_free(fr_control_t *c)
128
0
{
129
0
  (void) talloc_get_type_abort(c, fr_control_t);
130
131
0
#ifndef NDEBUG
132
0
  (void) fr_event_fd_unarmour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
133
0
#endif
134
0
  (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
135
136
0
  close(c->pipe[0]);
137
0
  close(c->pipe[1]);
138
139
0
  return 0;
140
0
}
141
142
/** Create a control-plane signaling path.
143
 *
144
 * @param[in] ctx the talloc context
145
 * @param[in] el the event list for the control socket
146
 * @param[in] aq the atomic queue where we will be pushing message data
147
 * @param[in] num_callbacks the initial number of callback entries to allocate.
148
 * @return
149
 *  - NULL on error
150
 *  - fr_control_t on success
151
 */
152
fr_control_t *fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
153
0
{
154
0
  fr_control_t *c;
155
156
0
  c = talloc_zero_size(ctx, sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * num_callbacks);
157
0
  talloc_set_type(c, fr_control_t);
158
0
  if (!c) {
159
0
    fr_strerror_const("Failed allocating memory");
160
0
    return NULL;
161
0
  }
162
0
  c->el = el;
163
0
  c->aq = aq;
164
0
  c->num_callbacks = num_callbacks;
165
166
0
  return c;
167
0
}
168
169
/** Open the control-plane signalling path
170
 *
171
 * @param[in] c the control-plane to open
172
 * @return
173
 *  - 0 on success
174
 *  - -1 on failure
175
 */
176
int fr_control_open(fr_control_t *c)
177
0
{
178
0
  if (pipe(c->pipe) < 0) {
179
0
    fr_strerror_printf("Failed opening pipe for control socket: %s", fr_syserror(errno));
180
0
    return -1;
181
0
  }
182
0
  talloc_set_destructor(c, _control_free);
183
184
  /*
185
   *  We don't want reads from the pipe to be blocking.
186
   */
187
0
  (void) fr_nonblock(c->pipe[0]);
188
0
  (void) fr_nonblock(c->pipe[1]);
189
0
  (void) fr_cloexec(c->pipe[0]);
190
0
  (void) fr_cloexec(c->pipe[1]);
191
192
0
  if (fr_event_fd_insert(c, NULL, c->el, c->pipe[0], pipe_read, NULL, NULL, c) < 0) {
193
0
    fr_strerror_const_push("Failed adding FD to event list control socket");
194
0
    return -1;
195
0
  }
196
197
0
#ifndef NDEBUG
198
0
  (void) fr_event_fd_armour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c);
199
0
#endif
200
0
  c->opened = true;
201
202
0
  return 0;
203
0
}
204
205
/** Clean up messages in a control-plane buffer
206
 *
207
 *  Find the oldest messages which are marked FR_CONTROL_MESSAGE_DONE,
208
 *  and mark them FR_CONTROL_MESSAGE_FREE.
209
 *
210
 * @param[in] c the fr_control_t
211
 * @param[in] rb the callers ring buffer for message allocation.
212
 * @return
213
 *  - <0 there are still messages used
214
 *  - 0 the control list is empty.
215
 */
216
int fr_control_gc(UNUSED fr_control_t *c, fr_ring_buffer_t *rb)
217
0
{
218
0
  while (true) {
219
0
    size_t room, message_size;
220
0
    fr_control_message_t *m;
221
222
0
    (void) fr_ring_buffer_start(rb, (uint8_t **) &m, &room);
223
0
    if (room == 0) break;
224
225
0
    fr_assert(m != NULL);
226
0
    fr_assert(room >= sizeof(*m));
227
228
0
    fr_assert(m->status != FR_CONTROL_MESSAGE_FREE);
229
230
0
    if (m->status != FR_CONTROL_MESSAGE_DONE) break;
231
232
0
    m->status = FR_CONTROL_MESSAGE_FREE;
233
234
    /*
235
     *  Each message is aligned to a 64-byte boundary,
236
     *  for cache contention issues.
237
     */
238
0
    message_size = sizeof(*m);
239
0
    message_size += m->data_size;
240
0
    message_size += 63;
241
0
    message_size &= ~(size_t) 63;
242
0
    fr_ring_buffer_free(rb, message_size);
243
0
  }
244
245
  /*
246
   *  Maybe we failed to garbage collect everything?
247
   */
248
0
  if (fr_ring_buffer_used(rb) > 0) {
249
0
    fr_strerror_const("Data still in control buffers");
250
0
    return -1;
251
0
  }
252
253
0
  return 0;
254
0
}
255
256
257
/** Allocate a control message
258
 *
259
 * @param[in] c the control structure
260
 * @param[in] rb the callers ring buffer for message allocation.
261
 * @param[in] id the ident of this message.
262
 * @param[in] data the data to write to the control plane
263
 * @param[in] data_size the size of the data to write to the control plane.
264
 * @return
265
 *  - NULL on error
266
 *  - fr_message_t on success
267
 */
268
static fr_control_message_t *fr_control_message_alloc(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
269
0
{
270
0
  size_t message_size;
271
0
  fr_control_message_t *m;
272
0
  uint8_t *p;
273
274
0
  message_size = sizeof(*m);
275
0
  message_size += data_size;
276
0
  message_size += 63;
277
0
  message_size &= ~(size_t) 63;
278
279
0
  m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
280
0
  if (!m) {
281
0
    (void) fr_control_gc(c, rb);
282
0
    m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size);
283
0
    if (!m) {
284
0
      fr_strerror_const_push("Failed allocating from ring buffer");
285
0
      return NULL;
286
0
    }
287
0
  }
288
289
0
  m->status = FR_CONTROL_MESSAGE_USED;
290
0
  m->id = id;
291
0
  m->data_size = data_size;
292
293
0
  p = (uint8_t *) m;
294
0
  memcpy(p + sizeof(*m), data, data_size);
295
296
0
  return m;
297
298
0
}
299
300
301
/** Push a control-plane message
302
 *
303
 *  This function is called ONLY from the originating threads.
304
 *
305
 * @param[in] c the control structure
306
 * @param[in] rb the callers ring buffer for message allocation.
307
 * @param[in] id the ident of this message.
308
 * @param[in] data the data to write to the control plane
309
 * @param[in] data_size the size of the data to write to the control plane.
310
 * @return
311
 *  - -2 on ring buffer full
312
 *  - <0 on error
313
 *  - 0 on success
314
 */
315
int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
316
0
{
317
0
  fr_control_message_t *m;
318
319
0
  (void) talloc_get_type_abort(c, fr_control_t);
320
321
0
  MPRINT("CONTROL push aq %p\n", c->aq);
322
323
  /*
324
   *  Get a message.  The alloc call attempts garbage collection
325
   *  if there is not enough free space.
326
   */
327
0
  m = fr_control_message_alloc(c, rb, id, data, data_size);
328
0
  if (!m) {
329
0
    fr_strerror_const_push("Failed allocating after GC");
330
0
    return -2;
331
0
  }
332
333
0
  if (!fr_atomic_queue_push(c->aq, m)) {
334
0
    m->status = FR_CONTROL_MESSAGE_DONE;
335
0
    fr_strerror_const("Failed pushing message to atomic queue.");
336
0
    return -1;
337
0
  }
338
339
0
  return 0;
340
0
}
341
342
/** Send a control-plane message
343
 *
344
 *  This function is called ONLY from the originating threads.
345
 *
346
 * @param[in] c the control structure
347
 * @param[in] rb the callers ring buffer for message allocation.
348
 * @param[in] id the ident of this message.
349
 * @param[in] data the data to write to the control plane
350
 * @param[in] data_size the size of the data to write to the control plane.
351
 * @return
352
 *  - <0 on error
353
 *  - 0 on success
354
 */
355
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
356
0
{
357
0
  (void) talloc_get_type_abort(c, fr_control_t);
358
359
0
  if (c->same_thread) {
360
0
    if (!c->type[id].callback) return -1;
361
362
0
    c->type[id].callback(c->type[id].ctx, data, data_size, fr_time());
363
0
    return 0;
364
0
  }
365
366
0
  if (fr_control_message_push(c, rb, id, data, data_size) < 0) return -1;
367
368
0
redo:
369
0
  if (write(c->pipe[1], ".", 1) >= 0) return 0;
370
371
0
  if (errno == EINTR) goto redo;
372
373
  /*
374
   *  EAGAIN means that the pipe is full, which means that the other end will eventually
375
   *  read from it.
376
   */
377
0
  if (errno == EAGAIN) return 0;
378
379
#if defined(EWOULDBLOCK) && (EWOULDBLOCK != EAGAIN)
380
  if (errno == EWOULDBLOCK) return 0;
381
#endif
382
383
  /*
384
   *  Other error, that's an issue.
385
   */
386
0
  return -1;
387
0
}
388
389
390
/** Pop control-plane message
391
 *
392
 *  This function is called ONLY from the receiving thread.
393
 *
394
 * @param[in] aq the recipients atomic queue for control-plane messages
395
 * @param[out] p_id the ident of this message.
396
 * @param[in,out] data where the data is stored
397
 * @param[in] data_size the size of the buffer where we store the data.
398
 * @return
399
 *  - <0 the size of the data we need to read the next message
400
 *  - 0 this kevent is not for us.
401
 *  - >0 the amount of data we've read
402
 */
403
ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size)
404
0
{
405
0
  uint8_t *p;
406
0
  fr_control_message_t *m;
407
408
0
  MPRINT("CONTROL pop aq %p\n", aq);
409
410
0
  if (!fr_atomic_queue_pop(aq, (void **) &m)) return 0;
411
412
0
  fr_assert_msg(m->status == FR_CONTROL_MESSAGE_USED, "Bad control message state, expected %u got %u",
413
0
          FR_CONTROL_MESSAGE_USED, m->status);
414
415
  /*
416
   *  There isn't enough room to store the data, die.
417
   */
418
0
  if (data_size < m->data_size) {
419
0
    fr_strerror_printf("Allocation size should be at least %zd", m->data_size);
420
0
    return -(m->data_size);
421
0
  }
422
423
0
  p = (uint8_t *) m;
424
0
  data_size = m->data_size;
425
0
  memcpy(data, p + sizeof(*m), data_size);
426
427
0
  m->status = FR_CONTROL_MESSAGE_DONE;
428
0
  *p_id = m->id;
429
0
  return data_size;
430
0
}
431
432
433
/** Register a callback for an ID
434
 *
435
 * @param[in] c the control structure
436
 * @param[in] id the ident of this message.
437
 * @param[in] ctx the context for the callback
438
 * @param[in] callback the callback function
439
 * @return
440
 *  - <0 on error
441
 *  - 0 on success
442
 */
443
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
444
0
{
445
0
  fr_control_t  *ctrl = talloc_get_type_abort(*c, fr_control_t);
446
447
0
  if (ctrl->opened) {
448
0
    fr_strerror_printf("Cannot add callbacks after the control is opened");
449
0
    return -1;
450
0
  }
451
452
  /*
453
   *  If there is not enough space in the array of callbacks
454
   *  re-allocate the fr_control_t with a larger array.
455
   */
456
0
  if (id >= ctrl->num_callbacks) {
457
0
    ctrl = talloc_realloc_size(talloc_parent(*c), *c, sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * (id + 1));
458
0
    if (!ctrl) {
459
0
      fr_strerror_printf("Failed re-allocating control when registering callback ID %d", id);
460
0
      return -1;
461
0
    }
462
0
    talloc_set_type(ctrl, fr_control_t);
463
464
    /*
465
     *  Zero the additional callback entries.
466
     */
467
0
    memset((uint8_t *)ctrl + sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * ctrl->num_callbacks, 0,
468
0
           (id + 1 - ctrl->num_callbacks) * sizeof(fr_control_ctx_t));
469
470
0
    ctrl->num_callbacks = id + 1;
471
0
    *c = ctrl;
472
0
  }
473
474
  /*
475
   *  Re-registering the same thing is OK.
476
   */
477
0
  if ((ctrl->type[id].ctx == ctx) &&
478
0
      (ctrl->type[id].callback == callback)) {
479
0
    return 0;
480
0
  }
481
482
0
  if (ctrl->type[id].callback != NULL) {
483
0
    fr_strerror_const("Callback is already set");
484
0
    return -1;
485
0
  }
486
487
0
  ctrl->type[id].id = id;
488
0
  ctrl->type[id].ctx = ctx;
489
0
  ctrl->type[id].callback = callback;
490
491
0
  return 0;
492
0
}
493
494
/** Delete a callback for an ID
495
 *
496
 * @param[in] c the control structure
497
 * @param[in] id the ident of this message.
498
 * @return
499
 *  - <0 on error
500
 *  - 0 on success
501
 */
502
int fr_control_callback_delete(fr_control_t *c, uint32_t id)
503
0
{
504
0
  (void) talloc_get_type_abort(c, fr_control_t);
505
506
0
  if (id >= c->num_callbacks) {
507
0
    fr_strerror_printf("Failed deleting unknown ID %d", id);
508
0
    return -1;
509
0
  }
510
511
0
  if (c->type[id].callback == NULL) return 0;
512
513
0
  c->type[id].id = 0;
514
0
  c->type[id].ctx = NULL;
515
0
  c->type[id].callback = NULL;
516
517
0
  return 0;
518
0
}
519
520
int fr_control_same_thread(fr_control_t *c)
521
0
{
522
0
  c->same_thread = true;
523
0
  (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO);
524
0
  close(c->pipe[0]);
525
0
  close(c->pipe[1]);
526
527
  /*
528
   *  Nothing more to do now that everything is gone.
529
   */
530
0
  talloc_set_destructor(c, NULL);
531
532
0
  return 0;
533
0
}
534
535
/** Wait for a plane control to become readable
536
 *
537
 * This is a blocking function so only to be used in rare cases such as waiting
538
 * for another thread to complete a task before proceeding.
539
 *
540
 * @param[in] c the control structure.
541
 */
542
void fr_control_wait(fr_control_t *c)
543
0
{
544
0
  struct pollfd fd;
545
546
0
  fd.fd = c->pipe[0];
547
0
  fd.events = POLLIN;
548
549
0
  (void) poll(&fd, 1, -1);
550
0
}