/src/freeradius-server/src/lib/io/control.c
Line | Count | Source |
1 | | /* |
2 | | * This program is free software; you can redistribute it and/or modify |
3 | | * it under the terms of the GNU General Public License as published by |
4 | | * the Free Software Foundation; either version 2 of the License, or |
5 | | * (at your option) any later version. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA |
15 | | */ |
16 | | |
17 | | /** |
18 | | * $Id: c133ceaca5bce535f3de3800c578b6d6b51bba5f $ |
19 | | * |
20 | | * @brief Control-plane signaling |
21 | | * @file io/control.c |
22 | | * |
23 | | * @copyright 2016 Alan DeKok (aland@freeradius.org) |
24 | | */ |
25 | | RCSID("$Id: c133ceaca5bce535f3de3800c578b6d6b51bba5f $") |
26 | | |
27 | | #include <freeradius-devel/io/control.h> |
28 | | #include <freeradius-devel/util/strerror.h> |
29 | | #include <freeradius-devel/util/syserror.h> |
30 | | #include <freeradius-devel/util/misc.h> |
31 | | #include <freeradius-devel/util/rand.h> |
32 | | |
33 | | #include <fcntl.h> |
34 | | #include <sys/event.h> |
35 | | #include <poll.h> |
36 | | |
37 | | /* |
38 | | * Debugging, mainly for channel_test |
39 | | */ |
40 | | #if 0 |
41 | | #define MPRINT(...) fprintf(stderr, __VA_ARGS__) |
42 | | #else |
43 | | #define MPRINT(...) |
44 | | #endif |
45 | | |
46 | | /** |
47 | | * Status of control messages |
48 | | */ |
49 | | typedef enum fr_control_message_status_t { |
50 | | FR_CONTROL_MESSAGE_FREE = 0, //!< the message is free |
51 | | FR_CONTROL_MESSAGE_USED, //!< the message is used (set only by originator) |
52 | | FR_CONTROL_MESSAGE_DONE //!< the message is done (set only by receiver) |
53 | | } fr_control_message_status_t; |
54 | | |
55 | | |
56 | | /** |
57 | | * The header for the control message |
58 | | */ |
59 | | typedef struct { |
60 | | fr_control_message_status_t status; //!< status of this message |
61 | | uint32_t id; //!< ID of this message |
62 | | size_t data_size; //!< size of the data we're sending |
63 | | } fr_control_message_t; |
64 | | |
65 | | |
66 | | typedef struct { |
67 | | uint32_t id; //!< id of this callback |
68 | | void *ctx; //!< context for the callback |
69 | | fr_control_callback_t callback; //!< the function to call |
70 | | } fr_control_ctx_t; |
71 | | |
72 | | |
73 | | /** |
74 | | * The control structure. |
75 | | */ |
76 | | struct fr_control_s { |
77 | | fr_event_list_t *el; //!< our event list |
78 | | |
79 | | fr_atomic_queue_t *aq; //!< destination AQ |
80 | | |
81 | | int pipe[2]; //!< our pipes |
82 | | |
83 | | bool same_thread; //!< are the two ends in the same thread |
84 | | |
85 | | bool opened; //!< has the control path been opened. |
86 | | |
87 | | uint32_t num_callbacks; //!< the size of the callback array |
88 | | |
89 | | fr_control_ctx_t type[]; //!< callbacks. Must be at the end of the structure as these |
90 | | ///< are created by allocating a larger memory size. |
91 | | }; |
92 | | |
93 | | static void pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx) |
94 | 0 | { |
95 | 0 | fr_control_t *c = talloc_get_type_abort(uctx, fr_control_t); |
96 | 0 | fr_time_t now; |
97 | 0 | char read_buffer[256]; |
98 | 0 | uint8_t data[256]; |
99 | 0 | size_t message_size; |
100 | 0 | uint32_t id = 0; |
101 | | |
102 | | /* |
103 | | * The presence of data on the pipe fd is just a trigger to pop all |
104 | | * available messages from the atomic queue, so the number of bytes |
105 | | * read is not important. |
106 | | */ |
107 | | /* coverity[check_return] */ |
108 | 0 | if (read(fd, read_buffer, sizeof(read_buffer)) <= 0) return; |
109 | | |
110 | 0 | now = fr_time(); |
111 | |
|
112 | 0 | while((message_size = fr_control_message_pop(c->aq, &id, data, sizeof(data)))) { |
113 | 0 | if (id >= c->num_callbacks) continue; |
114 | | |
115 | 0 | if (!c->type[id].callback) continue; |
116 | | |
117 | 0 | c->type[id].callback(c->type[id].ctx, data, message_size, now); |
118 | 0 | } |
119 | 0 | } |
120 | | |
121 | | /** Free a control structure |
122 | | * |
123 | | * This function really only calls the underlying "garbage collect". |
124 | | * |
125 | | * @param[in] c the control structure |
126 | | */ |
127 | | static int _control_free(fr_control_t *c) |
128 | 0 | { |
129 | 0 | (void) talloc_get_type_abort(c, fr_control_t); |
130 | |
|
131 | 0 | #ifndef NDEBUG |
132 | 0 | (void) fr_event_fd_unarmour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c); |
133 | 0 | #endif |
134 | 0 | (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO); |
135 | |
|
136 | 0 | close(c->pipe[0]); |
137 | 0 | close(c->pipe[1]); |
138 | |
|
139 | 0 | return 0; |
140 | 0 | } |
141 | | |
142 | | /** Create a control-plane signaling path. |
143 | | * |
144 | | * @param[in] ctx the talloc context |
145 | | * @param[in] el the event list for the control socket |
146 | | * @param[in] aq the atomic queue where we will be pushing message data |
147 | | * @param[in] num_callbacks the initial number of callback entries to allocate. |
148 | | * @return |
149 | | * - NULL on error |
150 | | * - fr_control_t on success |
151 | | */ |
152 | | fr_control_t *fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks) |
153 | 0 | { |
154 | 0 | fr_control_t *c; |
155 | |
|
156 | 0 | c = talloc_zero_size(ctx, sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * num_callbacks); |
157 | 0 | talloc_set_type(c, fr_control_t); |
158 | 0 | if (!c) { |
159 | 0 | fr_strerror_const("Failed allocating memory"); |
160 | 0 | return NULL; |
161 | 0 | } |
162 | 0 | c->el = el; |
163 | 0 | c->aq = aq; |
164 | 0 | c->num_callbacks = num_callbacks; |
165 | |
|
166 | 0 | return c; |
167 | 0 | } |
168 | | |
169 | | /** Open the control-plane signalling path |
170 | | * |
171 | | * @param[in] c the control-plane to open |
172 | | * @return |
173 | | * - 0 on success |
174 | | * - -1 on failure |
175 | | */ |
176 | | int fr_control_open(fr_control_t *c) |
177 | 0 | { |
178 | 0 | if (pipe(c->pipe) < 0) { |
179 | 0 | fr_strerror_printf("Failed opening pipe for control socket: %s", fr_syserror(errno)); |
180 | 0 | return -1; |
181 | 0 | } |
182 | 0 | talloc_set_destructor(c, _control_free); |
183 | | |
184 | | /* |
185 | | * We don't want reads from the pipe to be blocking. |
186 | | */ |
187 | 0 | (void) fr_nonblock(c->pipe[0]); |
188 | 0 | (void) fr_nonblock(c->pipe[1]); |
189 | 0 | (void) fr_cloexec(c->pipe[0]); |
190 | 0 | (void) fr_cloexec(c->pipe[1]); |
191 | |
|
192 | 0 | if (fr_event_fd_insert(c, NULL, c->el, c->pipe[0], pipe_read, NULL, NULL, c) < 0) { |
193 | 0 | fr_strerror_const_push("Failed adding FD to event list control socket"); |
194 | 0 | return -1; |
195 | 0 | } |
196 | | |
197 | 0 | #ifndef NDEBUG |
198 | 0 | (void) fr_event_fd_armour(c->el, c->pipe[0], FR_EVENT_FILTER_IO, (uintptr_t)c); |
199 | 0 | #endif |
200 | 0 | c->opened = true; |
201 | |
|
202 | 0 | return 0; |
203 | 0 | } |
204 | | |
205 | | /** Clean up messages in a control-plane buffer |
206 | | * |
207 | | * Find the oldest messages which are marked FR_CONTROL_MESSAGE_DONE, |
208 | | * and mark them FR_CONTROL_MESSAGE_FREE. |
209 | | * |
210 | | * @param[in] c the fr_control_t |
211 | | * @param[in] rb the callers ring buffer for message allocation. |
212 | | * @return |
213 | | * - <0 there are still messages used |
214 | | * - 0 the control list is empty. |
215 | | */ |
216 | | int fr_control_gc(UNUSED fr_control_t *c, fr_ring_buffer_t *rb) |
217 | 0 | { |
218 | 0 | while (true) { |
219 | 0 | size_t room, message_size; |
220 | 0 | fr_control_message_t *m; |
221 | |
|
222 | 0 | (void) fr_ring_buffer_start(rb, (uint8_t **) &m, &room); |
223 | 0 | if (room == 0) break; |
224 | | |
225 | 0 | fr_assert(m != NULL); |
226 | 0 | fr_assert(room >= sizeof(*m)); |
227 | |
|
228 | 0 | fr_assert(m->status != FR_CONTROL_MESSAGE_FREE); |
229 | |
|
230 | 0 | if (m->status != FR_CONTROL_MESSAGE_DONE) break; |
231 | | |
232 | 0 | m->status = FR_CONTROL_MESSAGE_FREE; |
233 | | |
234 | | /* |
235 | | * Each message is aligned to a 64-byte boundary, |
236 | | * for cache contention issues. |
237 | | */ |
238 | 0 | message_size = sizeof(*m); |
239 | 0 | message_size += m->data_size; |
240 | 0 | message_size += 63; |
241 | 0 | message_size &= ~(size_t) 63; |
242 | 0 | fr_ring_buffer_free(rb, message_size); |
243 | 0 | } |
244 | | |
245 | | /* |
246 | | * Maybe we failed to garbage collect everything? |
247 | | */ |
248 | 0 | if (fr_ring_buffer_used(rb) > 0) { |
249 | 0 | fr_strerror_const("Data still in control buffers"); |
250 | 0 | return -1; |
251 | 0 | } |
252 | | |
253 | 0 | return 0; |
254 | 0 | } |
255 | | |
256 | | |
257 | | /** Allocate a control message |
258 | | * |
259 | | * @param[in] c the control structure |
260 | | * @param[in] rb the callers ring buffer for message allocation. |
261 | | * @param[in] id the ident of this message. |
262 | | * @param[in] data the data to write to the control plane |
263 | | * @param[in] data_size the size of the data to write to the control plane. |
264 | | * @return |
265 | | * - NULL on error |
266 | | * - fr_message_t on success |
267 | | */ |
268 | | static fr_control_message_t *fr_control_message_alloc(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size) |
269 | 0 | { |
270 | 0 | size_t message_size; |
271 | 0 | fr_control_message_t *m; |
272 | 0 | uint8_t *p; |
273 | |
|
274 | 0 | message_size = sizeof(*m); |
275 | 0 | message_size += data_size; |
276 | 0 | message_size += 63; |
277 | 0 | message_size &= ~(size_t) 63; |
278 | |
|
279 | 0 | m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size); |
280 | 0 | if (!m) { |
281 | 0 | (void) fr_control_gc(c, rb); |
282 | 0 | m = (fr_control_message_t *) fr_ring_buffer_alloc(rb, message_size); |
283 | 0 | if (!m) { |
284 | 0 | fr_strerror_const_push("Failed allocating from ring buffer"); |
285 | 0 | return NULL; |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | 0 | m->status = FR_CONTROL_MESSAGE_USED; |
290 | 0 | m->id = id; |
291 | 0 | m->data_size = data_size; |
292 | |
|
293 | 0 | p = (uint8_t *) m; |
294 | 0 | memcpy(p + sizeof(*m), data, data_size); |
295 | |
|
296 | 0 | return m; |
297 | |
|
298 | 0 | } |
299 | | |
300 | | |
301 | | /** Push a control-plane message |
302 | | * |
303 | | * This function is called ONLY from the originating threads. |
304 | | * |
305 | | * @param[in] c the control structure |
306 | | * @param[in] rb the callers ring buffer for message allocation. |
307 | | * @param[in] id the ident of this message. |
308 | | * @param[in] data the data to write to the control plane |
309 | | * @param[in] data_size the size of the data to write to the control plane. |
310 | | * @return |
311 | | * - -2 on ring buffer full |
312 | | * - <0 on error |
313 | | * - 0 on success |
314 | | */ |
315 | | int fr_control_message_push(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size) |
316 | 0 | { |
317 | 0 | fr_control_message_t *m; |
318 | |
|
319 | 0 | (void) talloc_get_type_abort(c, fr_control_t); |
320 | |
|
321 | 0 | MPRINT("CONTROL push aq %p\n", c->aq); |
322 | | |
323 | | /* |
324 | | * Get a message. The alloc call attempts garbage collection |
325 | | * if there is not enough free space. |
326 | | */ |
327 | 0 | m = fr_control_message_alloc(c, rb, id, data, data_size); |
328 | 0 | if (!m) { |
329 | 0 | fr_strerror_const_push("Failed allocating after GC"); |
330 | 0 | return -2; |
331 | 0 | } |
332 | | |
333 | 0 | if (!fr_atomic_queue_push(c->aq, m)) { |
334 | 0 | m->status = FR_CONTROL_MESSAGE_DONE; |
335 | 0 | fr_strerror_const("Failed pushing message to atomic queue."); |
336 | 0 | return -1; |
337 | 0 | } |
338 | | |
339 | 0 | return 0; |
340 | 0 | } |
341 | | |
342 | | /** Send a control-plane message |
343 | | * |
344 | | * This function is called ONLY from the originating threads. |
345 | | * |
346 | | * @param[in] c the control structure |
347 | | * @param[in] rb the callers ring buffer for message allocation. |
348 | | * @param[in] id the ident of this message. |
349 | | * @param[in] data the data to write to the control plane |
350 | | * @param[in] data_size the size of the data to write to the control plane. |
351 | | * @return |
352 | | * - <0 on error |
353 | | * - 0 on success |
354 | | */ |
355 | | int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size) |
356 | 0 | { |
357 | 0 | (void) talloc_get_type_abort(c, fr_control_t); |
358 | |
|
359 | 0 | if (c->same_thread) { |
360 | 0 | if (!c->type[id].callback) return -1; |
361 | | |
362 | 0 | c->type[id].callback(c->type[id].ctx, data, data_size, fr_time()); |
363 | 0 | return 0; |
364 | 0 | } |
365 | | |
366 | 0 | if (fr_control_message_push(c, rb, id, data, data_size) < 0) return -1; |
367 | | |
368 | 0 | redo: |
369 | 0 | if (write(c->pipe[1], ".", 1) >= 0) return 0; |
370 | | |
371 | 0 | if (errno == EINTR) goto redo; |
372 | | |
373 | | /* |
374 | | * EAGAIN means that the pipe is full, which means that the other end will eventually |
375 | | * read from it. |
376 | | */ |
377 | 0 | if (errno == EAGAIN) return 0; |
378 | | |
379 | | #if defined(EWOULDBLOCK) && (EWOULDBLOCK != EAGAIN) |
380 | | if (errno == EWOULDBLOCK) return 0; |
381 | | #endif |
382 | | |
383 | | /* |
384 | | * Other error, that's an issue. |
385 | | */ |
386 | 0 | return -1; |
387 | 0 | } |
388 | | |
389 | | |
390 | | /** Pop control-plane message |
391 | | * |
392 | | * This function is called ONLY from the receiving thread. |
393 | | * |
394 | | * @param[in] aq the recipients atomic queue for control-plane messages |
395 | | * @param[out] p_id the ident of this message. |
396 | | * @param[in,out] data where the data is stored |
397 | | * @param[in] data_size the size of the buffer where we store the data. |
398 | | * @return |
399 | | * - <0 the size of the data we need to read the next message |
400 | | * - 0 this kevent is not for us. |
401 | | * - >0 the amount of data we've read |
402 | | */ |
403 | | ssize_t fr_control_message_pop(fr_atomic_queue_t *aq, uint32_t *p_id, void *data, size_t data_size) |
404 | 0 | { |
405 | 0 | uint8_t *p; |
406 | 0 | fr_control_message_t *m; |
407 | |
|
408 | 0 | MPRINT("CONTROL pop aq %p\n", aq); |
409 | |
|
410 | 0 | if (!fr_atomic_queue_pop(aq, (void **) &m)) return 0; |
411 | | |
412 | 0 | fr_assert_msg(m->status == FR_CONTROL_MESSAGE_USED, "Bad control message state, expected %u got %u", |
413 | 0 | FR_CONTROL_MESSAGE_USED, m->status); |
414 | | |
415 | | /* |
416 | | * There isn't enough room to store the data, die. |
417 | | */ |
418 | 0 | if (data_size < m->data_size) { |
419 | 0 | fr_strerror_printf("Allocation size should be at least %zd", m->data_size); |
420 | 0 | return -(m->data_size); |
421 | 0 | } |
422 | | |
423 | 0 | p = (uint8_t *) m; |
424 | 0 | data_size = m->data_size; |
425 | 0 | memcpy(data, p + sizeof(*m), data_size); |
426 | |
|
427 | 0 | m->status = FR_CONTROL_MESSAGE_DONE; |
428 | 0 | *p_id = m->id; |
429 | 0 | return data_size; |
430 | 0 | } |
431 | | |
432 | | |
433 | | /** Register a callback for an ID |
434 | | * |
435 | | * @param[in] c the control structure |
436 | | * @param[in] id the ident of this message. |
437 | | * @param[in] ctx the context for the callback |
438 | | * @param[in] callback the callback function |
439 | | * @return |
440 | | * - <0 on error |
441 | | * - 0 on success |
442 | | */ |
443 | | int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback) |
444 | 0 | { |
445 | 0 | fr_control_t *ctrl = talloc_get_type_abort(*c, fr_control_t); |
446 | |
|
447 | 0 | if (ctrl->opened) { |
448 | 0 | fr_strerror_printf("Cannot add callbacks after the control is opened"); |
449 | 0 | return -1; |
450 | 0 | } |
451 | | |
452 | | /* |
453 | | * If there is not enough space in the array of callbacks |
454 | | * re-allocate the fr_control_t with a larger array. |
455 | | */ |
456 | 0 | if (id >= ctrl->num_callbacks) { |
457 | 0 | ctrl = talloc_realloc_size(talloc_parent(*c), *c, sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * (id + 1)); |
458 | 0 | if (!ctrl) { |
459 | 0 | fr_strerror_printf("Failed re-allocating control when registering callback ID %d", id); |
460 | 0 | return -1; |
461 | 0 | } |
462 | 0 | talloc_set_type(ctrl, fr_control_t); |
463 | | |
464 | | /* |
465 | | * Zero the additional callback entries. |
466 | | */ |
467 | 0 | memset((uint8_t *)ctrl + sizeof(fr_control_t) + sizeof(fr_control_ctx_t) * ctrl->num_callbacks, 0, |
468 | 0 | (id + 1 - ctrl->num_callbacks) * sizeof(fr_control_ctx_t)); |
469 | |
|
470 | 0 | ctrl->num_callbacks = id + 1; |
471 | 0 | *c = ctrl; |
472 | 0 | } |
473 | | |
474 | | /* |
475 | | * Re-registering the same thing is OK. |
476 | | */ |
477 | 0 | if ((ctrl->type[id].ctx == ctx) && |
478 | 0 | (ctrl->type[id].callback == callback)) { |
479 | 0 | return 0; |
480 | 0 | } |
481 | | |
482 | 0 | if (ctrl->type[id].callback != NULL) { |
483 | 0 | fr_strerror_const("Callback is already set"); |
484 | 0 | return -1; |
485 | 0 | } |
486 | | |
487 | 0 | ctrl->type[id].id = id; |
488 | 0 | ctrl->type[id].ctx = ctx; |
489 | 0 | ctrl->type[id].callback = callback; |
490 | |
|
491 | 0 | return 0; |
492 | 0 | } |
493 | | |
494 | | /** Delete a callback for an ID |
495 | | * |
496 | | * @param[in] c the control structure |
497 | | * @param[in] id the ident of this message. |
498 | | * @return |
499 | | * - <0 on error |
500 | | * - 0 on success |
501 | | */ |
502 | | int fr_control_callback_delete(fr_control_t *c, uint32_t id) |
503 | 0 | { |
504 | 0 | (void) talloc_get_type_abort(c, fr_control_t); |
505 | |
|
506 | 0 | if (id >= c->num_callbacks) { |
507 | 0 | fr_strerror_printf("Failed deleting unknown ID %d", id); |
508 | 0 | return -1; |
509 | 0 | } |
510 | | |
511 | 0 | if (c->type[id].callback == NULL) return 0; |
512 | | |
513 | 0 | c->type[id].id = 0; |
514 | 0 | c->type[id].ctx = NULL; |
515 | 0 | c->type[id].callback = NULL; |
516 | |
|
517 | 0 | return 0; |
518 | 0 | } |
519 | | |
520 | | int fr_control_same_thread(fr_control_t *c) |
521 | 0 | { |
522 | 0 | c->same_thread = true; |
523 | 0 | (void) fr_event_fd_delete(c->el, c->pipe[0], FR_EVENT_FILTER_IO); |
524 | 0 | close(c->pipe[0]); |
525 | 0 | close(c->pipe[1]); |
526 | | |
527 | | /* |
528 | | * Nothing more to do now that everything is gone. |
529 | | */ |
530 | 0 | talloc_set_destructor(c, NULL); |
531 | |
|
532 | 0 | return 0; |
533 | 0 | } |
534 | | |
535 | | /** Wait for a plane control to become readable |
536 | | * |
537 | | * This is a blocking function so only to be used in rare cases such as waiting |
538 | | * for another thread to complete a task before proceeding. |
539 | | * |
540 | | * @param[in] c the control structure. |
541 | | */ |
542 | | void fr_control_wait(fr_control_t *c) |
543 | 0 | { |
544 | 0 | struct pollfd fd; |
545 | |
|
546 | 0 | fd.fd = c->pipe[0]; |
547 | 0 | fd.events = POLLIN; |
548 | |
|
549 | 0 | (void) poll(&fd, 1, -1); |
550 | 0 | } |