/src/freeradius-server/src/lib/io/coord_pair.c
Line | Count | Source |
1 | | /* |
2 | | * This program is free software; you can redistribute it and/or modify |
3 | | * it under the terms of the GNU General Public License as published by |
4 | | * the Free Software Foundation; either version 2 of the License, or |
5 | | * (at your option) any later version. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA |
15 | | */ |
16 | | |
17 | | /** |
18 | | * $Id: ec9a7b3c72087b3bf84af340fac59b9f8905dc60 $ |
19 | | * |
20 | | * @brief Sending pair lists to and from coordination threads |
21 | | * @file io/coord_pair.c |
22 | | * |
23 | | * @copyright 2026 Network RADIUS SAS (legal@networkradius.com) |
24 | | */ |
25 | | RCSID("$Id: ec9a7b3c72087b3bf84af340fac59b9f8905dc60 $") |
26 | | |
27 | | #include <freeradius-devel/internal/internal.h> |
28 | | #include <freeradius-devel/io/listen.h> |
29 | | #include <freeradius-devel/io/coord_pair.h> |
30 | | #include <freeradius-devel/io/coord_priv.h> |
31 | | #include <freeradius-devel/server/main_config.h> |
32 | | #include <freeradius-devel/unlang/base.h> |
33 | | |
34 | | static _Atomic(uint64_t) request_number = 0; |
35 | | |
36 | | FR_SLAB_TYPES(request, request_t) |
37 | 0 | FR_SLAB_FUNCS(request, request_t) Unexecuted instantiation: coord_pair.c:request_slab_reserve Unexecuted instantiation: coord_pair.c:request_slab_element_init Unexecuted instantiation: coord_pair.c:request_slab_release Unexecuted instantiation: coord_pair.c:request_slab_list_alloc Unexecuted instantiation: coord_pair.c:request_slab_init Unexecuted instantiation: coord_pair.c:_request_slab_cleanup |
38 | | |
39 | | static fr_dlist_head_t *coord_pair_regs = NULL; |
40 | | static module_list_t *coord_pair_modules; |
41 | | static fr_dict_attr_t const *attr_worker_id = NULL; |
42 | | |
43 | | /** Registration of pair list callbacks |
44 | | * |
45 | | */ |
46 | | struct fr_coord_pair_reg_s { |
47 | | char const *name; //!< Name for log / request name. |
48 | | fr_dlist_t entry; //!< Entry in list of pair list registrations |
49 | | fr_dict_attr_t const *attr_packet_type; //!< Attribute containing packet type |
50 | | fr_dict_attr_t const *root; //!< Pair list decoding root attribute |
51 | | fr_coord_worker_pair_cb_reg_t **callbacks; //!< Array of pointers to callbacks |
52 | | uint32_t max_packet_type; //!< Largest valid value for packet type |
53 | | uint32_t cb_id; //!< The coordinator callback ID used for pair list handling |
54 | | fr_time_delta_t max_request_time; //!< Maximum time for coordinator request processing. |
55 | | fr_slab_config_t reuse; //!< Request slab allocation config. |
56 | | virtual_server_t const *vs; //!< Virtual server containing coordinator process sections. |
57 | | }; |
58 | | |
59 | | struct fr_coord_pair_s { |
60 | | fr_coord_t *coord; //!< Coordinator which this coord pair is attached to. |
61 | | fr_coord_pair_reg_t *coord_pair_reg; //!< Registration details for this coord pair |
62 | | fr_event_list_t *el; //!< Event list for interpreter. |
63 | | unlang_interpret_t *intp; //!< Interpreter for running requests. |
64 | | fr_heap_t *runnable; //!< Current runnable requests. |
65 | | |
66 | | fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist. |
67 | | fr_time_delta_t predicted; //!< How long we predict a request will take to execute. |
68 | | fr_time_tracking_t tracking; //!< How much time the coordinator has spent doing things. |
69 | | uint64_t num_active; //!< Number of active requests. |
70 | | request_slab_list_t *slab; //!< slab allocator for request_t |
71 | | }; |
72 | | |
73 | | /** Packet context used when coordinator messages are processed through an interpreter |
74 | | * |
75 | | * Allows access to the coordinator structure and arbitrary data |
76 | | * throughout the state machine. |
77 | | */ |
78 | | typedef struct { |
79 | | fr_coord_pair_t *coord_pair; //!< Coordinator pair this packet is for. |
80 | | void *uctx; //!< Source specific ctx. |
81 | | } fr_coord_packet_ctx_t; |
82 | | |
83 | | /** Conf parser to read slab settings from module config |
84 | | */ |
85 | | static const conf_parser_t request_reuse_config[] = { |
86 | | FR_SLAB_CONFIG_CONF_PARSER |
87 | | CONF_PARSER_TERMINATOR |
88 | | }; |
89 | | |
90 | | /** Remove a coord pair registration from the list when it is freed |
91 | | */ |
92 | | static int _coord_pair_reg_free(fr_coord_pair_reg_t *to_free) |
93 | 0 | { |
94 | 0 | fr_assert(coord_pair_regs); |
95 | |
|
96 | 0 | fr_dlist_remove(coord_pair_regs, to_free); |
97 | | |
98 | | /* If all the registrations are gone, free the list */ |
99 | 0 | if (fr_dlist_num_elements(coord_pair_regs) == 0) { |
100 | 0 | TALLOC_FREE(coord_pair_regs); |
101 | 0 | TALLOC_FREE(coord_pair_modules); |
102 | 0 | } |
103 | 0 | return 0; |
104 | 0 | } |
105 | | |
106 | | /** Register a set of callbacks for pair list based coordinator messages |
107 | | * |
108 | | * Returns a structure to pass as uctx to fr_coord_cb_t using the |
109 | | * macro FR_COORD_PAIR_CB_CTX_SET. |
110 | | * |
111 | | * @param reg_ctx Callback details to register. |
112 | | */ |
113 | | fr_coord_pair_reg_t *fr_coord_pair_register(fr_coord_pair_reg_ctx_t *reg_ctx) |
114 | 0 | { |
115 | 0 | fr_coord_pair_reg_t *coord_pair_reg; |
116 | 0 | fr_coord_worker_pair_cb_reg_t *cb_reg = reg_ctx->worker_cb; |
117 | 0 | CONF_SECTION *cs; |
118 | 0 | CONF_PAIR *cp; |
119 | |
|
120 | 0 | fr_assert(reg_ctx->root); |
121 | | |
122 | | /* Resolve the Worker-Id attribute if not already done */ |
123 | 0 | if (!attr_worker_id) { |
124 | 0 | attr_worker_id = fr_dict_attr_by_name(NULL, fr_dict_root(fr_dict_internal()), "Worker-Id"); |
125 | 0 | if (!attr_worker_id) { |
126 | 0 | ERROR("Failed to resolve Worker-Id attribute"); |
127 | 0 | return NULL; |
128 | 0 | } |
129 | 0 | } |
130 | | |
131 | 0 | if (!coord_pair_regs) { |
132 | 0 | MEM(coord_pair_regs = talloc_zero(NULL, fr_dlist_head_t)); |
133 | 0 | fr_dlist_init(coord_pair_regs, fr_coord_pair_reg_t, entry); |
134 | 0 | MEM(coord_pair_modules = module_list_alloc(NULL, &module_list_type_global, "coord", true)); |
135 | 0 | } |
136 | |
|
137 | 0 | MEM(coord_pair_reg = talloc(coord_pair_regs, fr_coord_pair_reg_t)); |
138 | 0 | *coord_pair_reg = (fr_coord_pair_reg_t) { |
139 | 0 | .name = reg_ctx->name, |
140 | 0 | .root = reg_ctx->root, |
141 | 0 | .cb_id = reg_ctx->cb_id, |
142 | 0 | .max_request_time = fr_time_delta_eq(reg_ctx->max_request_time, fr_time_delta_from_msec(0)) ? |
143 | 0 | main_config->worker.max_request_time : reg_ctx->max_request_time, |
144 | 0 | }; |
145 | |
|
146 | 0 | while (cb_reg->callback) { |
147 | 0 | if (cb_reg->packet_type > coord_pair_reg->max_packet_type) { |
148 | 0 | coord_pair_reg->max_packet_type = cb_reg->packet_type; |
149 | 0 | } |
150 | 0 | cb_reg++; |
151 | 0 | } |
152 | | |
153 | | /* |
154 | | * A sane limit on packet type values to avoid a huge array. |
155 | | * If larger values are needed in the future we can use a folded array. |
156 | | */ |
157 | 0 | fr_assert(coord_pair_reg->max_packet_type <= 256); |
158 | |
|
159 | 0 | MEM(coord_pair_reg->callbacks = talloc_zero_array(coord_pair_reg, fr_coord_worker_pair_cb_reg_t *, |
160 | 0 | coord_pair_reg->max_packet_type + 1)); |
161 | |
|
162 | 0 | cb_reg = reg_ctx->worker_cb; |
163 | 0 | while (cb_reg->callback) { |
164 | 0 | coord_pair_reg->callbacks[cb_reg->packet_type] = cb_reg; |
165 | 0 | cb_reg++; |
166 | 0 | } |
167 | |
|
168 | 0 | cs = cf_section_find(reg_ctx->cs, "reuse", NULL); |
169 | | |
170 | | /* |
171 | | * Create an empty "reuse" section if one is not found, so defaults are applied |
172 | | */ |
173 | 0 | if (!cs) { |
174 | 0 | cs = cf_section_alloc(reg_ctx->cs, reg_ctx->cs, "reuse", NULL); |
175 | 0 | } |
176 | |
|
177 | 0 | if (cf_section_rules_push(cs, request_reuse_config) < 0) { |
178 | 0 | fail: |
179 | 0 | talloc_free(coord_pair_reg); |
180 | 0 | return NULL; |
181 | 0 | } |
182 | 0 | if (cf_section_parse(coord_pair_reg, &coord_pair_reg->reuse, cs) < 0) goto fail; |
183 | | |
184 | | /* |
185 | | * Set defaults for request slab allocation, if not set by conf parsing |
186 | | */ |
187 | 0 | if (!coord_pair_reg->reuse.child_pool_size) coord_pair_reg->reuse.child_pool_size = REQUEST_POOL_SIZE; |
188 | 0 | if (!coord_pair_reg->reuse.num_children) coord_pair_reg->reuse.num_children = REQUEST_POOL_NUM_OBJECTS; |
189 | |
|
190 | 0 | cp = cf_pair_find(reg_ctx->cs, "virtual_server"); |
191 | 0 | if (!cp) { |
192 | 0 | cf_log_err(reg_ctx->cs, "Missing virtual_server option"); |
193 | 0 | goto fail; |
194 | 0 | } |
195 | | |
196 | 0 | coord_pair_reg->vs = virtual_server_find(cf_pair_value(cp)); |
197 | 0 | if (!coord_pair_reg->vs) { |
198 | 0 | cf_log_err(cp, "Virtual server not found"); |
199 | 0 | goto fail; |
200 | 0 | } |
201 | | |
202 | | /* |
203 | | * Validate that the virtual server uses the correct namespace. |
204 | | */ |
205 | 0 | if (reg_ctx->root->dict != virtual_server_dict_by_cs(virtual_server_cs(coord_pair_reg->vs))) { |
206 | 0 | cf_log_err(cp, "Virtual server has namespace %s, should be %s", |
207 | 0 | fr_dict_root(virtual_server_dict_by_cs(virtual_server_cs(coord_pair_reg->vs)))->name, |
208 | 0 | fr_dict_root(coord_pair_reg->root->dict)->name); |
209 | 0 | goto fail; |
210 | 0 | } |
211 | 0 | coord_pair_reg->attr_packet_type = virtual_server_packet_type_by_cs(virtual_server_cs(coord_pair_reg->vs)); |
212 | |
|
213 | 0 | fr_dlist_insert_tail(coord_pair_regs, coord_pair_reg); |
214 | 0 | talloc_set_destructor(coord_pair_reg, _coord_pair_reg_free); |
215 | |
|
216 | 0 | return coord_pair_reg; |
217 | 0 | } |
218 | | |
219 | | /** Return the coordinator callback ID associated with a coord_pair_reg_t |
220 | | */ |
221 | | uint32_t fr_coord_pair_reg_cb_id(fr_coord_pair_reg_t *coord_pair_reg) |
222 | 0 | { |
223 | 0 | fr_assert(coord_pair_reg); |
224 | 0 | return coord_pair_reg->cb_id; |
225 | 0 | } |
226 | | |
227 | | /* |
228 | | * The following set of callbacks for request handling are mirrors of |
229 | | * their equivalent in worker.c |
230 | | */ |
231 | | |
232 | | /** Signal the unlang interpreter that it needs to stop running the request |
233 | | * |
234 | | * @param[in] request request to cancel. The request may still run to completion. |
235 | | */ |
236 | | static void coord_pair_stop_request(request_t *request) |
237 | 0 | { |
238 | 0 | unlang_interpret_signal(request, FR_SIGNAL_CANCEL); |
239 | 0 | } |
240 | | |
241 | | /** Enforce max_request_time |
242 | | * |
243 | | * @param[in] tl the coordinators's timer list. |
244 | | * @param[in] when the current time |
245 | | * @param[in] uctx the request_t timing out. |
246 | | */ |
247 | | static void _coord_pair_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx) |
248 | 0 | { |
249 | 0 | request_t *request = talloc_get_type_abort(uctx, request_t); |
250 | |
|
251 | 0 | REDEBUG("Request has reached max_request_time - signalling it to stop"); |
252 | 0 | coord_pair_stop_request(request); |
253 | |
|
254 | 0 | request->rcode = RLM_MODULE_TIMEOUT; |
255 | 0 | } |
256 | | |
257 | | /** Set, or re-set the request timer |
258 | | * |
259 | | * @param[in] coord_pair the coord_pair_t containing the timeout lists. |
260 | | * @param[in] request that we're timing out. |
261 | | * @param[in] timeout the timeout to set. |
262 | | * @return |
263 | | * - 0 on success. |
264 | | * - -1 on failure. |
265 | | */ |
266 | | static int fr_coord_pair_request_timeout_set(fr_coord_pair_t *coord_pair, request_t *request, fr_time_delta_t timeout) |
267 | 0 | { |
268 | 0 | if (unlikely(fr_timer_in(request, coord_pair->timeout, &request->timeout, timeout, |
269 | 0 | true, _coord_pair_request_timeout, request) < 0)) { |
270 | 0 | RERROR("Failed to create request timeout timer"); |
271 | 0 | return -1; |
272 | 0 | } |
273 | | |
274 | 0 | return 0; |
275 | 0 | } |
276 | | |
277 | | /** Start time tracking for a request, and mark it as runnable. |
278 | | */ |
279 | | static int coord_pair_request_time_tracking_start(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now) |
280 | 0 | { |
281 | 0 | fr_assert(!fr_timer_armed(request->timeout)); |
282 | |
|
283 | 0 | if (unlikely(fr_coord_pair_request_timeout_set(coord_pair, request, |
284 | 0 | coord_pair->coord_pair_reg->max_request_time) < 0)) { |
285 | 0 | RERROR("Failed to set request timeout"); |
286 | 0 | return -1; |
287 | 0 | } |
288 | | |
289 | 0 | RDEBUG3("Time tracking started in yielded state"); |
290 | 0 | fr_time_tracking_start(&coord_pair->tracking, &request->async->tracking, now); |
291 | 0 | fr_time_tracking_yield(&request->async->tracking, now); |
292 | 0 | coord_pair->num_active++; |
293 | |
|
294 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
295 | 0 | (void) fr_heap_insert(&coord_pair->runnable, request); |
296 | |
|
297 | 0 | return 0; |
298 | 0 | } |
299 | | |
300 | | /** End time tracking for a request |
301 | | */ |
302 | | static void coord_pair_request_time_tracking_end(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now) |
303 | 0 | { |
304 | 0 | RDEBUG3("Time tracking ended"); |
305 | 0 | fr_time_tracking_end(&coord_pair->predicted, &request->async->tracking, now); |
306 | 0 | fr_assert(coord_pair->num_active > 0); |
307 | 0 | coord_pair->num_active--; |
308 | |
|
309 | 0 | TALLOC_FREE(request->timeout); /* Disarm the request timer */ |
310 | 0 | } |
311 | | |
312 | | |
313 | | static inline CC_HINT(always_inline) |
314 | | void coord_pair_request_init(fr_event_list_t *el, request_t *request, fr_time_t now, void *packet_ctx) |
315 | | { |
316 | | if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false)); |
317 | | if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false)); |
318 | | |
319 | | request->packet->timestamp = now; |
320 | | request->async = talloc_zero(request, fr_async_t); |
321 | | request->async->recv_time = now; |
322 | | request->async->el = el; |
323 | | request->async->packet_ctx = packet_ctx; |
324 | | fr_dlist_entry_init(&request->async->entry); |
325 | | } |
326 | | |
327 | | static inline CC_HINT(always_inline) |
328 | | void coord_pair_request_name_number(request_t *request, char const *name) |
329 | 0 | { |
330 | 0 | request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst); |
331 | 0 | if (request->name) talloc_const_free(request->name); |
332 | 0 | request->name = talloc_asprintf(request, "Coord-%s-%"PRIu64, name, request->number); |
333 | 0 | } |
334 | | |
335 | | static int _coord_pair_request_deinit( request_t *request, UNUSED void *uctx) |
336 | 0 | { |
337 | 0 | return request_slab_deinit(request); |
338 | 0 | } |
339 | | |
340 | | static request_t *coord_pair_request_bootstrap(fr_coord_pair_t *coord_pair, fr_time_t now, void *uctx) |
341 | | { |
342 | | request_t *request; |
343 | | fr_coord_packet_ctx_t *packet_ctx; |
344 | | |
345 | | request = request_slab_reserve(coord_pair->slab); |
346 | | if (!request) { |
347 | | ERROR("Coordinator failed allocating new request"); |
348 | | return NULL; |
349 | | } |
350 | | |
351 | | request_slab_element_set_destructor(request, _coord_pair_request_deinit, coord_pair); |
352 | | |
353 | | if (request_init(request, REQUEST_TYPE_INTERNAL, |
354 | | (&(request_init_args_t){ |
355 | | .namespace = virtual_server_dict_by_cs(virtual_server_cs(coord_pair->coord_pair_reg->vs)) |
356 | | }))) { |
357 | | ERROR("Coordinator failed initializing new request"); |
358 | | request_slab_release(request); |
359 | | return NULL; |
360 | | } |
361 | | |
362 | | MEM(packet_ctx = talloc(request, fr_coord_packet_ctx_t)); |
363 | | *packet_ctx = (fr_coord_packet_ctx_t) { |
364 | | .coord_pair = coord_pair, |
365 | | .uctx = uctx |
366 | | }; |
367 | | coord_pair_request_init(coord_pair->el, request, now, packet_ctx); |
368 | | coord_pair_request_name_number(request, coord_pair->coord_pair_reg->name); |
369 | | |
370 | | unlang_interpret_set(request, coord_pair->intp); |
371 | | |
372 | | return request; |
373 | | } |
374 | | |
375 | | static void coord_pair_request_start(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now) |
376 | 0 | { |
377 | 0 | fr_pair_t *vp; |
378 | |
|
379 | 0 | vp = fr_pair_find_by_da(&request->request_pairs, NULL, coord_pair->coord_pair_reg->attr_packet_type); |
380 | 0 | if (!vp) { |
381 | 0 | RERROR("Missing %s attribute", coord_pair->coord_pair_reg->attr_packet_type->name); |
382 | 0 | error: |
383 | 0 | request_slab_release(request); |
384 | 0 | return; |
385 | 0 | } |
386 | | |
387 | 0 | request->packet->code = vp->vp_uint32; |
388 | |
|
389 | 0 | if (virtual_server_push(NULL, request, coord_pair->coord_pair_reg->vs, UNLANG_TOP_FRAME) < 0) { |
390 | 0 | RERROR("Protocol failed to set 'process' function"); |
391 | 0 | goto error; |
392 | 0 | } |
393 | | |
394 | 0 | if (unlikely(coord_pair_request_time_tracking_start(coord_pair, request, now) < 0)) { |
395 | 0 | RERROR("Failed to start request time tracking"); |
396 | 0 | goto error; |
397 | 0 | } |
398 | 0 | } |
399 | | |
400 | | static void _coord_pair_request_internal_init(request_t *request, void *uctx) |
401 | 0 | { |
402 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
403 | 0 | fr_time_t now = fr_time(); |
404 | |
|
405 | 0 | fr_assert(request->packet); |
406 | 0 | fr_assert(request->reply); |
407 | |
|
408 | 0 | request->packet->timestamp = now; |
409 | 0 | request->async = talloc_zero(request, fr_async_t); |
410 | 0 | request->async->recv_time = now; |
411 | 0 | request->async->el = coord_pair->el; |
412 | 0 | fr_dlist_entry_init(&request->async->entry); |
413 | | |
414 | | /* |
415 | | * Requests generated by the interpreter |
416 | | * are always marked up as internal. |
417 | | */ |
418 | 0 | fr_assert(request_is_internal(request)); |
419 | 0 | coord_pair_request_time_tracking_start(coord_pair, request, now); |
420 | 0 | } |
421 | | |
422 | | /** External request is now complete - will never happen with coordinators |
423 | | * |
424 | | */ |
425 | | static void _coord_pair_request_done_external(UNUSED request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx) |
426 | 0 | { |
427 | 0 | fr_assert(0); |
428 | 0 | } |
429 | | |
430 | | /** Internal request (i.e. one generated by the interpreter) is now complete |
431 | | * |
432 | | * Whatever generated the request is now responsible for freeing it. |
433 | | */ |
434 | | static void _coord_pair_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx) |
435 | 0 | { |
436 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
437 | |
|
438 | 0 | coord_pair_request_time_tracking_end(coord_pair, request, fr_time()); |
439 | |
|
440 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
441 | 0 | fr_assert(!fr_timer_armed(request->timeout)); |
442 | 0 | fr_assert(!fr_dlist_entry_in_list(&request->async->entry)); |
443 | 0 | } |
444 | | |
445 | | /** Detached request (i.e. one generated by the interpreter with no parent) is now complete |
446 | | * |
447 | | * As the request has no parent, then there's nothing to free it |
448 | | * so we have to. |
449 | | */ |
450 | | static void _coord_pair_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx) |
451 | 0 | { |
452 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
453 | |
|
454 | 0 | TALLOC_FREE(request->timeout); |
455 | |
|
456 | 0 | fr_assert(!fr_dlist_entry_in_list(&request->async->entry)); |
457 | |
|
458 | 0 | talloc_free(request); |
459 | 0 | } |
460 | | |
461 | | /** Make us responsible for running the request |
462 | | * |
463 | | */ |
464 | | static void _coord_pair_request_detach(request_t *request, void *uctx) |
465 | 0 | { |
466 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
467 | |
|
468 | 0 | RDEBUG4("%s - Request detaching", __FUNCTION__); |
469 | |
|
470 | 0 | if (request_is_detachable(request)) { |
471 | | /* |
472 | | * End the time tracking... We don't track detached requests, |
473 | | * because they don't contribute for the time consumed by an |
474 | | * external request. |
475 | | */ |
476 | 0 | if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) { |
477 | 0 | RDEBUG3("Forcing time tracking to running state, from yielded, for request detach"); |
478 | 0 | fr_time_tracking_resume(&request->async->tracking, fr_time()); |
479 | 0 | } |
480 | 0 | coord_pair_request_time_tracking_end(coord_pair, request, fr_time()); |
481 | |
|
482 | 0 | if (request_detach(request) < 0) RPEDEBUG("Failed detaching request"); |
483 | |
|
484 | 0 | RDEBUG3("Request is detached"); |
485 | 0 | } else { |
486 | 0 | fr_assert_msg(0, "Request is not detachable"); |
487 | 0 | } |
488 | 0 | } |
489 | | |
490 | | /** Request is now runnable |
491 | | * |
492 | | */ |
493 | | static void _coord_pair_request_runnable(request_t *request, void *uctx) |
494 | 0 | { |
495 | 0 | fr_coord_pair_t *coord_pair = uctx; |
496 | |
|
497 | 0 | RDEBUG4("%s - Request marked as runnable", __FUNCTION__); |
498 | 0 | fr_heap_insert(&coord_pair->runnable, request); |
499 | 0 | } |
500 | | |
501 | | /** Interpreter yielded request |
502 | | * |
503 | | */ |
504 | | static void _coord_pair_request_yield(request_t *request, UNUSED void *uctx) |
505 | 0 | { |
506 | 0 | RDEBUG4("%s - Request yielded", __FUNCTION__); |
507 | 0 | if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time()); |
508 | 0 | } |
509 | | |
510 | | /** Interpreter is starting to work on request again |
511 | | * |
512 | | */ |
513 | | static void _coord_pair_request_resume(request_t *request, UNUSED void *uctx) |
514 | 0 | { |
515 | 0 | RDEBUG4("%s - Request resuming", __FUNCTION__); |
516 | 0 | if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time()); |
517 | 0 | } |
518 | | |
519 | | /** Check if a request is scheduled |
520 | | * |
521 | | */ |
522 | | static bool _coord_pair_request_scheduled(request_t const *request, UNUSED void *uctx) |
523 | 0 | { |
524 | 0 | return fr_heap_entry_inserted(request->runnable); |
525 | 0 | } |
526 | | |
527 | | /** Update a request's priority |
528 | | * |
529 | | */ |
530 | | static void _coord_pair_request_prioritise(request_t *request, void *uctx) |
531 | 0 | { |
532 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
533 | |
|
534 | 0 | RDEBUG4("%s - Request priority changed", __FUNCTION__); |
535 | | |
536 | | /* Extract the request from the runnable queue _if_ it's in the runnable queue */ |
537 | 0 | if (fr_heap_extract(&coord_pair->runnable, request) < 0) return; |
538 | | |
539 | | /* Reinsert it to re-evaluate its new priority */ |
540 | 0 | fr_heap_insert(&coord_pair->runnable, request); |
541 | 0 | } |
542 | | |
543 | | /** Compare two requests by priority and sequence |
544 | | */ |
545 | | static int8_t coord_pair_runnable_cmp(void const *one, void const *two) |
546 | 0 | { |
547 | 0 | request_t const *a = one, *b = two; |
548 | 0 | int ret; |
549 | |
|
550 | 0 | ret = CMP(b->priority, a->priority); |
551 | 0 | if (ret != 0) return ret; |
552 | | |
553 | 0 | return CMP(a->sequence, b->sequence); |
554 | 0 | } |
555 | | |
556 | 0 | void fr_coord_pair_inst_destroy(UNUSED fr_coord_t *coord, fr_coord_cb_inst_t *inst, bool single_thread, UNUSED void *uctx) { |
557 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(inst->inst_data, fr_coord_pair_t); |
558 | 0 | int ret, count = 0; |
559 | |
|
560 | 0 | if (!single_thread) unlang_interpret_set_thread_default(NULL); |
561 | |
|
562 | 0 | ret = fr_timer_list_force_run(coord_pair->timeout); |
563 | 0 | if (unlikely(ret < 0)) { |
564 | 0 | fr_assert_msg(0, "Failed to force run the timeout list"); |
565 | 0 | } else { |
566 | 0 | count += ret; |
567 | 0 | } |
568 | |
|
569 | 0 | DEBUG("Coordinator is exiting - stopped %u requests", count); |
570 | 0 | } |
571 | | |
572 | | /** Create the coord_pair coord instance data |
573 | | */ |
574 | | static fr_coord_pair_t *fr_coord_pair_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, |
575 | | bool single_thread, void *uctx) |
576 | 0 | { |
577 | 0 | fr_coord_pair_t *coord_pair; |
578 | 0 | fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t); |
579 | |
|
580 | 0 | MEM(coord_pair = talloc(ctx, fr_coord_pair_t)); |
581 | 0 | *coord_pair = (fr_coord_pair_t) { |
582 | 0 | .coord = coord, |
583 | 0 | .coord_pair_reg = coord_pair_reg, |
584 | 0 | .el = el |
585 | 0 | }; |
586 | |
|
587 | 0 | coord_pair->runnable = fr_heap_talloc_alloc(coord_pair, coord_pair_runnable_cmp, request_t, runnable, 0); |
588 | 0 | if (!coord_pair->runnable) { |
589 | 0 | fr_strerror_const("Failed creating runnable heap"); |
590 | 0 | fail: |
591 | 0 | talloc_free(coord_pair); |
592 | 0 | return NULL; |
593 | 0 | } |
594 | | |
595 | 0 | coord_pair->timeout = fr_timer_list_ordered_alloc(coord_pair, el->tl); |
596 | 0 | if (!coord_pair->timeout) { |
597 | 0 | fr_strerror_const("Failed creating timeouts list"); |
598 | 0 | goto fail; |
599 | 0 | } |
600 | | |
601 | 0 | coord_pair->intp = unlang_interpret_init(coord_pair, el, |
602 | 0 | &(unlang_request_func_t){ |
603 | 0 | .init_internal = _coord_pair_request_internal_init, |
604 | |
|
605 | 0 | .done_external = _coord_pair_request_done_external, |
606 | 0 | .done_internal = _coord_pair_request_done_internal, |
607 | 0 | .done_detached = _coord_pair_request_done_detached, |
608 | |
|
609 | 0 | .detach = _coord_pair_request_detach, |
610 | 0 | .yield = _coord_pair_request_yield, |
611 | 0 | .resume = _coord_pair_request_resume, |
612 | 0 | .mark_runnable = _coord_pair_request_runnable, |
613 | |
|
614 | 0 | .scheduled = _coord_pair_request_scheduled, |
615 | 0 | .prioritise = _coord_pair_request_prioritise |
616 | 0 | }, coord_pair); |
617 | |
|
618 | 0 | if (!coord_pair->intp) goto fail; |
619 | | |
620 | 0 | if (!(coord_pair->slab = request_slab_list_alloc(coord_pair, el, &coord_pair_reg->reuse, NULL, NULL, |
621 | 0 | coord_pair, true, false))) { |
622 | 0 | goto fail; |
623 | 0 | } |
624 | | |
625 | 0 | if (!single_thread) unlang_interpret_set_thread_default(coord_pair->intp); |
626 | |
|
627 | 0 | return coord_pair; |
628 | 0 | } |
629 | | |
630 | | static inline CC_HINT(always_inline) void coord_run_request(fr_coord_pair_t *coord_pair, fr_time_t start) |
631 | 0 | { |
632 | 0 | request_t *request; |
633 | 0 | fr_time_t now; |
634 | |
|
635 | 0 | now = start; |
636 | |
|
637 | 0 | while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) && |
638 | 0 | ((request = fr_heap_pop(&coord_pair->runnable)) != NULL)) { |
639 | 0 | REQUEST_VERIFY(request); |
640 | 0 | fr_assert(!fr_heap_entry_inserted(request->runnable)); |
641 | |
|
642 | 0 | (void)unlang_interpret(request, UNLANG_REQUEST_RESUME); |
643 | |
|
644 | 0 | now = fr_time(); |
645 | 0 | } |
646 | 0 | } |
647 | | |
648 | | /* |
649 | | * Pre and post events used in single threaded mode |
650 | | */ |
651 | | |
652 | | static int fr_coord_pair_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx) |
653 | 0 | { |
654 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
655 | 0 | request_t *request; |
656 | |
|
657 | 0 | request = fr_heap_peek(coord_pair->runnable); |
658 | 0 | return request ? 1 : 0; |
659 | 0 | } |
660 | | |
661 | | static void fr_coord_pair_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx) |
662 | 0 | { |
663 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
664 | |
|
665 | 0 | coord_run_request(coord_pair, fr_time()); |
666 | 0 | } |
667 | | |
668 | | /** Event callback in multi threaded mode |
669 | | */ |
670 | | static void fr_coord_pair_event(UNUSED fr_event_list_t *el, void *uctx) |
671 | 0 | { |
672 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t); |
673 | |
|
674 | 0 | coord_run_request(coord_pair, fr_time()); |
675 | 0 | } |
676 | | |
677 | | /** Callback run when a coordinator receives pair list data |
678 | | * |
679 | | * Converts the data into a request. |
680 | | */ |
681 | | void fr_coord_pair_data_recv(UNUSED fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *inst, void *uctx) |
682 | 0 | { |
683 | 0 | fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t); |
684 | 0 | fr_coord_pair_t *coord_pair = talloc_get_type_abort(inst, fr_coord_pair_t); |
685 | 0 | request_t *request; |
686 | 0 | fr_pair_t *vp; |
687 | |
|
688 | 0 | request = coord_pair_request_bootstrap(coord_pair, now, coord_pair_reg); |
689 | 0 | if (!request) return; |
690 | | |
691 | 0 | if (fr_pair_append_by_da(request->request_ctx, &vp, &request->request_pairs, attr_worker_id) < 0) { |
692 | 0 | error: |
693 | 0 | request_slab_release(request); |
694 | 0 | return; |
695 | 0 | }; |
696 | 0 | vp->vp_int32 = worker_id; |
697 | |
|
698 | 0 | if (fr_internal_decode_list_dbuff(request->pair_list.request, &request->request_pairs, |
699 | 0 | fr_dict_root(request->proto_dict), dbuff, NULL) < 0) { |
700 | 0 | RERROR("Failed decoding packet"); |
701 | 0 | goto error; |
702 | 0 | } |
703 | | |
704 | 0 | coord_pair_request_start(coord_pair, request, now); |
705 | 0 | } |
706 | | |
707 | | /** Callback run when a worker receives pair list data |
708 | | * |
709 | | * Finds the packet type attribute in the data and calls the callback |
710 | | * registered against the value of that attribute. |
711 | | * |
712 | | * @param cw Worker which received the message. |
713 | | * @param dbuff Data received. |
714 | | * @param now Time the data is received. |
715 | | * @param mctx Module context to pass to callback. |
716 | | * @param uctx The coord_pair registration. |
717 | | */ |
718 | | void fr_coord_worker_pair_data_recv(fr_coord_worker_t *cw, fr_dbuff_t *dbuff, fr_time_t now, module_ctx_t *mctx, void *uctx) |
719 | 0 | { |
720 | 0 | fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t); |
721 | 0 | fr_pair_list_t list; |
722 | 0 | fr_pair_t *vp; |
723 | |
|
724 | 0 | fr_pair_list_init(&list); |
725 | 0 | if (fr_internal_decode_list_dbuff(NULL, &list, coord_pair_reg->root, dbuff, NULL) < 0) { |
726 | 0 | PERROR("Failed to decode data as pair list"); |
727 | 0 | goto free; |
728 | 0 | } |
729 | | |
730 | 0 | vp = fr_pair_find_by_da_nested(&list, NULL, coord_pair_reg->attr_packet_type); |
731 | |
|
732 | 0 | if (!vp) { |
733 | 0 | ERROR("Message received without %s", coord_pair_reg->attr_packet_type->name); |
734 | 0 | goto free; |
735 | 0 | } |
736 | | |
737 | 0 | if (vp->vp_uint32 > coord_pair_reg->max_packet_type || !coord_pair_reg->callbacks[vp->vp_uint32]) { |
738 | 0 | ERROR("Message received with invalid value %pP", vp); |
739 | 0 | goto free; |
740 | 0 | } |
741 | | |
742 | 0 | coord_pair_reg->callbacks[vp->vp_uint32]->callback(cw, coord_pair_reg, &list, now, mctx, |
743 | 0 | coord_pair_reg->callbacks[vp->vp_uint32]->uctx); |
744 | |
|
745 | 0 | free: |
746 | 0 | fr_pair_list_free(&list); |
747 | 0 | } |
748 | | |
749 | | /** Send a reply list from a coordinator to a worker |
750 | | * |
751 | | * @param request containing the reply to send. |
752 | | * @param worker_id to send the reply to. |
753 | | * @return |
754 | | * - 0 on success |
755 | | * - -1 on failure |
756 | | */ |
757 | | int fr_coord_to_worker_reply_send(request_t *request, uint32_t worker_id) |
758 | 0 | { |
759 | 0 | fr_dbuff_t dbuff; |
760 | 0 | fr_dbuff_uctx_talloc_t tctx; |
761 | 0 | fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t); |
762 | 0 | fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(packet_ctx->uctx, fr_coord_pair_reg_t); |
763 | 0 | int ret; |
764 | |
|
765 | 0 | if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1; |
766 | 0 | if (fr_internal_encode_list(&dbuff, &request->reply_pairs, NULL) < 0) { |
767 | 0 | fr_dbuff_free_talloc(&dbuff); |
768 | 0 | return -1; |
769 | 0 | } |
770 | | |
771 | 0 | ret = fr_coord_to_worker_send(packet_ctx->coord_pair->coord, worker_id, coord_pair_reg->cb_id, &dbuff); |
772 | |
|
773 | 0 | fr_dbuff_free_talloc(&dbuff); |
774 | |
|
775 | 0 | return ret; |
776 | 0 | } |
777 | | |
778 | | /** Send a reply list from a coordinator to all workers |
779 | | * |
780 | | * @param request containing the reply to send. |
781 | | * @return |
782 | | * - 0 on success |
783 | | * - < 0 on failure |
784 | | */ |
785 | | int fr_coord_to_worker_reply_broadcast(request_t *request) |
786 | 0 | { |
787 | 0 | fr_dbuff_t dbuff; |
788 | 0 | fr_dbuff_uctx_talloc_t tctx; |
789 | 0 | fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t); |
790 | 0 | fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(packet_ctx->uctx, fr_coord_pair_reg_t); |
791 | 0 | int ret; |
792 | |
|
793 | 0 | if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1; |
794 | 0 | if (fr_internal_encode_list(&dbuff, &request->reply_pairs, NULL) < 0) { |
795 | 0 | fr_dbuff_free_talloc(&dbuff); |
796 | 0 | return -1; |
797 | 0 | } |
798 | | |
799 | 0 | ret = fr_coord_to_worker_broadcast(packet_ctx->coord_pair->coord, coord_pair_reg->cb_id, &dbuff); |
800 | |
|
801 | 0 | fr_dbuff_free_talloc(&dbuff); |
802 | |
|
803 | 0 | return ret; |
804 | 0 | } |
805 | | |
806 | | /** Send a pair list from a worker to a coordinator |
807 | | * |
808 | | * The pair list must include an attribute indicating the packet type |
809 | | * |
810 | | * @param cw The coord worker sending the data. |
811 | | * @param coord_pair_reg The coord_pair registration to use. |
812 | | * @param list of pairs to send. |
813 | | * @return |
814 | | * - 0 on success |
815 | | * - -1 on failure |
816 | | */ |
817 | | int fr_worker_to_coord_pair_send(fr_coord_worker_t *cw, fr_coord_pair_reg_t *coord_pair_reg, fr_pair_list_t *list) |
818 | 0 | { |
819 | 0 | fr_dbuff_t dbuff; |
820 | 0 | fr_dbuff_uctx_talloc_t tctx; |
821 | 0 | int ret; |
822 | |
|
823 | 0 | if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1; |
824 | 0 | if (fr_internal_encode_list(&dbuff, list, NULL) < 0) { |
825 | 0 | fr_dbuff_free_talloc(&dbuff); |
826 | 0 | return -1; |
827 | 0 | } |
828 | | |
829 | 0 | ret = fr_worker_to_coord_send(cw, coord_pair_reg->cb_id, &dbuff); |
830 | |
|
831 | 0 | fr_dbuff_free_talloc(&dbuff); |
832 | 0 | return ret; |
833 | 0 | } |
834 | | |
835 | | /** Instance creation called during coordinator creation. |
836 | | * |
837 | | * @param ctx to allocate the instance in. |
838 | | * @param coord Coordinator to create an instance of. |
839 | | * @param el Event list for instance to use. |
840 | | * @param single_thread is the server in single thread mode. |
841 | | * @param uctx configured for the callback this instance relates to. |
842 | | * @return |
843 | | * - fr_coord_cb_inst_t on success |
844 | | * - NULL on failure |
845 | | */ |
846 | | fr_coord_cb_inst_t *fr_coord_pair_inst_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, |
847 | | bool single_thread, void *uctx) |
848 | 0 | { |
849 | 0 | fr_coord_cb_inst_t *cb_inst; |
850 | 0 | fr_coord_pair_t *coord_pair; |
851 | |
|
852 | 0 | MEM(cb_inst = talloc(ctx, fr_coord_cb_inst_t)); |
853 | |
|
854 | 0 | *cb_inst = (fr_coord_cb_inst_t) { |
855 | 0 | .event_pre_cb = fr_coord_pair_pre_event, |
856 | 0 | .event_post_cb = fr_coord_pair_post_event, |
857 | 0 | .event_cb = fr_coord_pair_event |
858 | 0 | }; |
859 | |
|
860 | 0 | coord_pair = fr_coord_pair_create(ctx, coord, el, single_thread, uctx); |
861 | 0 | if (!coord_pair) { |
862 | 0 | talloc_free(cb_inst); |
863 | 0 | return NULL; |
864 | 0 | } |
865 | | |
866 | 0 | cb_inst->inst_data = coord_pair; |
867 | |
|
868 | 0 | return cb_inst; |
869 | 0 | } |
870 | | |
871 | | /** Return the coord_pair associated with a coord_pair internal request |
872 | | * |
873 | | * @param request to fetch associated coordinator for. |
874 | | * @return fr_coord_t |
875 | | */ |
876 | | fr_coord_pair_t *fr_coord_pair_request_coord_pair(request_t *request) |
877 | 0 | { |
878 | 0 | fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t); |
879 | 0 | return packet_ctx->coord_pair; |
880 | 0 | } |
881 | | |
882 | | /** Start a coordinator request to run through a coord_pair process module |
883 | | * |
884 | | * @param coord_pair with the process module to run the request. |
885 | | * @param list Pairs to populate the request. |
886 | | * @param now Request time. |
887 | | * @return |
888 | | * 0 on success |
889 | | * -1 on failure |
890 | | */ |
891 | | int fr_coord_pair_coord_request_start(fr_coord_pair_t *coord_pair, fr_pair_list_t *list, fr_time_t now) |
892 | 0 | { |
893 | 0 | request_t *request; |
894 | |
|
895 | 0 | request = coord_pair_request_bootstrap(coord_pair, now, coord_pair->coord_pair_reg); |
896 | 0 | if (!request) return -1; |
897 | | |
898 | 0 | fr_pair_list_steal(request->request_ctx, list); |
899 | 0 | fr_pair_list_append(&request->request_pairs, list); |
900 | |
|
901 | 0 | coord_pair_request_start(coord_pair, request, now); |
902 | 0 | return 0; |
903 | 0 | } |